forked from mongodb/libmongocrypt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate-test-data.py
141 lines (124 loc) · 4.76 KB
/
generate-test-data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python2
import boto3
import sys
import os
import bson
import uuid
from bson.codec_options import CodecOptions
from datetime import datetime
from bson import json_util
import json
import base64
instructions = """
This setup script prints a realistic example key document, schema, list collections responses, and KMS reply, and marking, to be used for test data.
Run this script from the root of the libmongocrypt repository.
Set the following environment variables:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_REGION - e.g. "us-east-1"
AWS_CMK_ID - customer master key. e.g: arn:aws:kms:us-east-1:524754917239:key/70b3825c-602f-4d65-aeeb-087e565c6abc
"""
try:
cmk_id = os.environ["AWS_CMK_ID"]
access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
region = os.environ["AWS_REGION"]
except KeyError:
print(instructions)
sys.exit(1)
# Ensure UUIDs are encoded as BSON binary subtype 6.
codec_options = CodecOptions(uuid_representation=bson.binary.STANDARD)
json_options = json_util.JSONOptions(json_mode=json_util.JSONMode.CANONICAL, uuid_representation=bson.binary.STANDARD)
kms_client = boto3.client("kms", aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, region_name=region)
print ("A key document:")
key_doc = {
"_id": bson.binary.UUID(bytes=bytes(b"a" * 16)),
"keyMaterial": None, # to be filled
"creationDate": datetime.now(),
"updateDate": datetime.now(),
"status": 1, # active
"masterKey": { # not currently used in demo.
"provider": "aws",
"key": cmk_id,
"region": region
}
}
plaintext_key_material = base64.b64decode("TqhXy3tKckECjy4/ZNykMWG8amBF46isVPzeOgeusKrwheBmYaU8TMG5AHR/NeUDKukqo8hBGgogiQOVpLPkqBQHD8YkLsNbDmHoGOill5QAHnniF/Lz405bGucB5TfR")
# get real encrypted key material using AWS KMS.
response = kms_client.encrypt(KeyId=cmk_id, Plaintext=plaintext_key_material)
print(len(response["CiphertextBlob"]))
# replace the key material after encrypting.
key_doc["keyMaterial"] = bson.binary.Binary(response["CiphertextBlob"])
print (json_util.dumps (key_doc, indent=4, json_options=json_options))
print ("A KMS reply for encrypted key material")
kms_encrypt_reply_json = json.dumps({
"KeyId": cmk_id,
"CiphertextBlob": base64.b64encode(key_doc["keyMaterial"]).decode("utf-8")
})
kms_encrypt_reply = "HTTP/1.1 200 OK\n" + \
"x-amzn-RequestId: deeb35e5-4ecb-4bf1-9af5-84a54ff0af0e\n" + \
"Content-Type: application/x-amz-json-1.1\n" + \
"Content-Length: %d\n" % len(kms_encrypt_reply_json) + \
"Connection: close\n\n" + \
kms_encrypt_reply_json
print (kms_encrypt_reply)
print ("\n\nA KMS reply for decrypting the key material")
kms_decrypt_reply_json = json.dumps({
"KeyId": cmk_id,
"Plaintext": base64.b64encode(bytes(plaintext_key_material)).decode("utf-8")
})
kms_reply = "HTTP/1.1 200 OK\n" + \
"x-amzn-RequestId: deeb35e5-4ecb-4bf1-9af5-84a54ff0af0e\n" + \
"Content-Type: application/x-amz-json-1.1\n" + \
"Content-Length: %d\n\n" % len(kms_decrypt_reply_json) + \
kms_decrypt_reply_json
print (kms_reply)
print ("\n\nA marking indicating a value to be encrypted:")
value_to_mark = "457-55-5462"
marking = bson.binary.Binary(b"\00" + bson.BSON.encode({
"a": 1,
"ki": key_doc["_id"]
}, codec_options=codec_options), subtype=6)
print (json_util.dumps ({"marking": marking}, indent=4, json_options=json_options))
print ("\n\nA mongocryptd reply with the marking above:")
marked_reply = {
"result": {
"find": "test",
"filter": {
"ssn": marking
}
},
"hasEncryptedPlaceholders": True,
"schemaRequiresEncryption": True,
"ok": 1
}
print (json_util.dumps (marked_reply, indent=4, json_options=json_options))
print ("\n\nA listCollections result with a JSONSchema referencing the key:")
collection_info = {
"name": "test",
"type": "collection",
"options": {
"validator": {
"$jsonSchema": {
"bsonType": "object",
"properties": {
"ssn": {
"encrypt": {
"keyId": [key_doc["_id"]],
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
},
"random": {
"encrypt": {
"keyId": [key_doc["_id"]],
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
}
}
}
}
}
}
print (json_util.dumps (collection_info, indent=4, json_options=json_options))