forked from learning-zone/python-basics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl_client.py
240 lines (200 loc) · 9.23 KB
/
etl_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import json
class ETLClient:
def run(self, service, max_requests):
"""
Handle max_requests calls to the given DocumentService.
RetryImmediatelyError should be silently ignored and should *not*
count as a request.
Return document list.
Content truncated due to max data buffer of 50kb being reached. Try flushing buffer with less content.
Please write a simple ETL client for an fake document service.
Requirements
1: run() must return a JSON string.
See test_req_1
2: run() must return a JSON string as a dictionary, containing:
'doc-count': [integer] the number of documents received
'error-count':[integer] the number of errors received
'docs': [dictionary] keys are a document's ID string and values are arrays of words in the document
See test_req_2
3: all of run()'s output words must be lower case
See test_req_3
4: these words must not appear in the word array: and, or, not, but, to, in
See test_req_4
5: RetryImmediatelyError.
The service's handle_request() may raise RetryImmediatelyError.
Do not count the error against the number of requests to run.
Re-try the operation (by calling the handle_request() again) until successful.
Include the number of errors in the output as in described in 2 above.
See test_req_5
6: The service may ask you to 'update' docs. These requests will look like:
{'operation': 'update',
'document': {
'id': [string] document id
'data': [string] new document data
}
}
Expect that the document ID will match an existing document previously sent by an 'add' operation.
The document with the matching ID should have its data replaced with the data sent in the 'update' operation.
See test_req_6
7:
The service may ask you to 'delete' docs. These requests will look like:
{'operation': 'delete',
'document-id': [string] document id
}
Expect that the document ID will match an existing document previously sent by an 'add' operation.
Delete the document that matches that ID.
See test_req_7
"""
documents = []
documents.append({
'doc-count': 0,
'error-count': 0,
'docs': {}
})
for i in range(0, max_requests):
while True:
try:
event = service.handle_request()
if event['operation'] == 'add':
# 'add': service sends us:
# {'operation':'add','document':{'data':'<words>','id':'<doc-id>'}}
doc_id = event['document']['id']
doc_data = self.remove_words(event['document']['data'])
# adds new doc_id and data
documents[0]['docs'][doc_id] = doc_data
# counts number of documents
documents[0]['doc-count'] = documents[0].get('doc-count') + 1
if event['operation'] == 'update':
doc_id = event['document']['id']
doc_data = self.remove_words(event['document']['data'])
# updates doc data by on doc id
documents[0]['docs'][doc_id] = doc_data
if event['operation'] == 'delete':
# removes document
doc_id = event['document-id']
docs = documents[0]['docs']
for docid, data in docs.items():
if docid == doc_id:
del docs[docid]
# subtracts 1 from number of documents
documents[0]['doc-count'] = documents[0].get('doc-count') - 1
except RetryImmediatelyError:
# counts number of retry errors
documents[0]['error-count'] = documents[0].get('error-count') + 1
continue
break
return json.dumps(documents[0])
def remove_words(self, string):
# sanitizes string data in documents
remove_words = set(['and', 'or', 'not', 'but', 'to', 'in'])
words = string.lower().split(' ')
for word in words:
if word in remove_words:
words.remove(word)
return words
class Test(unittest.TestCase):
def test_req_1(self):
self.assertIsInstance(ETLClient().run(DocumentService(2), 1), basestring)
def test_req_2(self):
r = ETLClient().run(DocumentService(2), 2)
expect = [
json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt Voluptatibus eius Fugiat Sunt not error Nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus Voluptatibus aperiam Quo".split()
}
}),
json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt not error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split()
}
}),
json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split()
}
})
]
self.assertIn(r,expect)
def test_req_3(self):
r = ETLClient().run(DocumentService(2), 2)
expect = [
json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt not error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split()
}
}),
json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split()
}
})
]
self.assertIn(r,expect)
def _ordered(self, obj):
"""
sorts multi-level objects recursively
"""
if isinstance(obj, dict):
return sorted((k, self._ordered(v)) for k, v in obj.items())
if isinstance(obj, list):
return sorted(self._ordered(x) for x in obj)
else:
return obj
def test_req_4(self):
r = ETLClient().run(DocumentService(2), 2)
expect = json.dumps({
'doc-count': 2,
'error-count': 0,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split()
}
})
self.assertIn(self._ordered(r),self._ordered(expect))
def test_req_5(self):
r = ETLClient().run(DocumentService(2), 3)
expect = json.dumps({
'doc-count': 3,
'error-count': 1,
'docs': {
'f01dba4999266bff87400756e8830528': "aliquid cum ut labore nesciunt voluptatibus eius fugiat sunt error nulla vitae rerum".split(),
'87e8d5ee79eb735b2e4e4fb88a9438e9': "nihil natus voluptatibus aperiam quo".split(),
'43e18d33e6b052a1f0b04d22b60f2059': "non occaecati accusantium animi eius sit placeat fugit dolor voluptate iure a".split()
}
})
self.assertIn(self._ordered(r),self._ordered(expect))
def test_req_6(self):
r = ETLClient().run(DocumentService(6), 3)
expect = json.dumps({
'doc-count': 2,
'error-count': 1,
'docs': {
'd740a7c5c4cb68a38c4cad51cc713a4f': 'quia consectetur maiores mollitia'.split(),
'9491e81a43723db0c05e94662f06b6f3': 'ut accusantium recusandae animi velit labore id iure voluptate vel enim quo consequatur saepe'.split()
}
})
self.assertIn(self._ordered(r),self._ordered(expect))
def test_req_7(self):
r = ETLClient().run(DocumentService(6), 9)
expect = json.dumps({
'doc-count': 0,
'error-count': 1,
'docs': {
}
})
self.assertIn(self._ordered(r),self._ordered(expect))