-
Notifications
You must be signed in to change notification settings - Fork 295
/
Copy pathutils.py
161 lines (144 loc) · 6.47 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import time
from config import config
import boto3
from boto3.dynamodb.conditions import Key, Attr
import os
def type_check (val, Type):
if Type == 'dict':
return isinstance (val, dict)
if Type == 'list':
return isinstance (val, list)
if Type == 'str':
return isinstance (val, str)
if Type == 'int':
return isinstance (val, int)
if Type == 'tuple':
return isinstance (val, tuple)
if Type == 'fun':
return callable (val)
if Type == 'bool':
return type (val) == bool
def object_check (obj, key, Type):
if not type_check (obj, 'dict') or not key in obj:
return False
return type_check (obj [key], Type)
def timems ():
return int (round (time.time () * 1000))
def times ():
return int (round (time.time ()))
# *** DYNAMO DB ***
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
db = boto3.client ('dynamodb', region_name = config ['dynamodb'] ['region'], aws_access_key_id = os.getenv ('AWS_DYNAMODB_ACCESS_KEY'), aws_secret_access_key = os.getenv ('AWS_DYNAMODB_SECRET_KEY'))
db_prefix = os.getenv ('AWS_DYNAMODB_TABLE_PREFIX')
# Encode a dict so that it has the format expected by DynamoDB
def db_encode (data, *args):
# args may contain a boolean flag which we use to detect whether we should format the payload for update_item
# when len (args) is truthy, we know we're dealing with update_item
processed_data = {}
for key in data:
if type_check (data [key], 'str'):
if len (args):
processed_data [key] = {'Value': {'S': data [key]}}
else:
processed_data [key] = {'S': data [key]}
elif type_check (data [key], 'int'):
# Note we convert the value into a string
if len (args):
processed_data [key] = {'Value': {'N': str (data [key])}}
else:
processed_data [key] = {'N': str (data [key])}
elif data [key] == None:
if len (args):
processed_data [key] = {'Action': 'DELETE'}
else:
processed_data [key] = {'NULL': True}
else:
raise ValueError ('Unsupported type passed to db_put')
return processed_data
# Decode data in DynamoDB format to a plain dict
def db_decode (data):
processed_data = {}
for key in data:
if 'S' in data [key]:
processed_data [key] = data [key] ['S']
elif 'N' in data [key]:
processed_data [key] = int (data [key] ['N'])
elif 'NULL' in data [key]:
processed_data [key] = None
else:
raise ValueError ('Unsupported type passed to db_put')
return processed_data
# This function takes a dict `data` and returns a new dict with only the key/value for the index key for the table.
# If *remove is truthy, then the index key is removed instead, leaving the rest of the keys intact.
def db_key (table, data, *remove):
processed_data = {}
if len (remove):
for key in data:
if table == 'users' and key == 'username':
continue
if (table == 'tokens' or table == 'programs') and key == 'id':
continue
processed_data [key] = data [key]
else:
if table == 'users':
processed_data ['username'] = data ['username']
if table == 'tokens' or table == 'programs':
processed_data ['id'] = data ['id']
return processed_data
# Gets an item by index from the database. If not_primary is truthy, the search is done by a field that should be set as a secondary index.
def db_get (table, data, *not_primary):
# If we're querying by something else than the primary key of the table, we assume that data contains only one field, that on which we want to search. We also require that field to have an index.
if len (not_primary):
field = list (data.keys ()) [0]
result = db.query (TableName = db_prefix + '-' + table, IndexName = field + '-index', KeyConditionExpression = field + ' = :value', ExpressionAttributeValues = {':value': {'S': data [field]}})
if len (result ['Items']):
return db_decode (result ['Items'] [0])
else:
return None
else:
result = db.get_item (TableName = db_prefix + '-' + table, Key = db_encode (db_key (table, data)))
if 'Item' not in result:
return None
return db_decode (result ['Item'])
# Gets an item by index from the database. If not_primary is truthy, the search is done by a field that should be set as a secondary index.
def db_get_many (table, data, *not_primary):
if len (not_primary):
field = list (data.keys ()) [0]
# We use ScanIndexForward = False to get the latest items from the Programs table
result = db.query (TableName = db_prefix + '-' + table, IndexName = field + '-index', KeyConditionExpression = field + ' = :value', ExpressionAttributeValues = {':value': {'S': data [field]}}, ScanIndexForward = False)
else:
result = db.query (TableName = db_prefix + '-' + table, Key = db_encode (db_key (table, data)))
data = []
for item in result ['Items']:
data.append (db_decode (item))
return data
# Creates or updates an item by primary key.
def db_set (table, data):
if db_get (table, data):
result = db.update_item (TableName = db_prefix + '-' + table, Key = db_encode (db_key (table, data)), AttributeUpdates = db_encode (db_key (table, data, True), True))
else:
result = db.put_item (TableName = db_prefix + '-' + table, Item = db_encode (data))
return result
# Deletes an item by primary key.
def db_del (table, data):
return db.delete_item (TableName = db_prefix + '-' + table, Key = db_encode (db_key (table, data)))
# Deletes multiple items.
def db_del_many (table, data, *not_primary):
# We define a recursive function in case the number of results is very large and cannot be returned with a single call to db_get_many.
def batch ():
to_delete = db_get_many (table, data, *not_primary)
if len (to_delete) == 0:
return
for item in to_delete:
db_del (table, db_key (table, item))
batch ()
batch ()
# Searches for items.
def db_scan (table):
result = db.scan (TableName = db_prefix + '-' + table)
output = []
for item in result ['Items']:
output.append (db_decode (item))
return output
def db_describe (table):
return db.describe_table (TableName = db_prefix + '-' + table)