forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support for malformed internationalized email headers
When an emails contains headers that use Unicode without properly crafing them to comform to RFC-6323 the email import module would crash. (See issue MISP#119 & issue MISP#93) To address this I have added additional layers of encoding/decoding to any possibly internationalized email headers. This decodes properly formed and malformed UTF-8, UTF-16, and UTF-32 headers appropriately. When an unknown encoding is encountered it is returned as an 'encoded-word' per RFC2047. This commit also adds unit-tests that tests properly formed and malformed UTF-8, UTF-16, UTF-32, and CJK encoded strings in all header fields; UTF-8, UTF-16, and UTF-32 encoded message bodies; and emoji testing for headers and attachment file names.
- Loading branch information
1 parent
3eecf9a
commit 40c71af
Showing
2 changed files
with
272 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
import json | ||
import os | ||
import io | ||
import re | ||
import zipfile | ||
from hashlib import sha256 | ||
from email.mime.application import MIMEApplication | ||
|
@@ -316,7 +317,6 @@ def test_email_attachment_password_in_html_body(self): | |
query['data'] = decode_email(message) | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
# print(response.json()) | ||
values = [x["values"] for x in response.json()["results"]] | ||
self.assertIn('EICAR.com', values) | ||
for i in response.json()['results']: | ||
|
@@ -341,10 +341,12 @@ def test_email_body_encoding(self): | |
message.attach(MIMEText(text, 'html', encoding[0])) | ||
query['data'] = decode_email(message) | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
response = requests.post(self.url + "query", data=data).json() | ||
self.assertNotIn('error', response, response.get('error', "")) | ||
self.assertIn('results', response, "No server results found.") | ||
|
||
|
||
def test_email_header_encoding(self): | ||
def test_email_header_proper_encoding(self): | ||
query = {"module":"email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
|
@@ -358,13 +360,236 @@ def test_email_header_encoding(self): | |
""" | ||
message.attach(MIMEText(text, 'plain')) | ||
for hdr, hdr_val in message.items(): | ||
# Encoding is used as the name of the file | ||
msg = message | ||
hdr_encoded = MIMEText(hdr_val.encode(encoding), 'plain', encoding) | ||
msg[hdr] = Header(hdr_val, encoding) | ||
encoded_header = hdr_val.encode(encoding) | ||
msg.replace_header(hdr, Header(encoded_header, encoding)) | ||
query['data'] = decode_email(msg) | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
results = response.json()['results'] | ||
values = [] | ||
for x in results: | ||
# Remove BOM from UTF-16 strings | ||
if re.search('\ufeff', x["values"]): | ||
values.append(re.sub('\ufeff', "", x["values"])) | ||
else: | ||
values.append(x["values"]) | ||
types = {} | ||
for i in results: | ||
types.setdefault(i["type"], 0) | ||
types[i["type"]] += 1 | ||
# Check that all the items were correct | ||
self.assertEqual(types['target-email'], 1) | ||
self.assertIn('[email protected]', values) | ||
self.assertEqual(types['email-dst-display-name'], 4) | ||
self.assertIn('Last One', values) | ||
self.assertIn('Other Friend', values) | ||
self.assertIn('Second Person', values) | ||
self.assertIn('Testy Testerson', values) | ||
self.assertEqual(types['email-dst'], 4) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertEqual(types['email-src-display-name'], 2) | ||
self.assertIn("Innocent Person", values) | ||
self.assertEqual(types['email-src'], 2) | ||
self.assertIn("[email protected]", values) | ||
self.assertIn("[email protected]", values) | ||
self.assertEqual(types['email-thread-index'], 1) | ||
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values) | ||
self.assertEqual(types['email-message-id'], 1) | ||
self.assertIn("<[email protected]>", values) | ||
self.assertEqual(types['email-subject'], 1) | ||
self.assertIn("Example Message", values) | ||
self.assertEqual(types['email-header'], 1) | ||
self.assertEqual(types['email-x-mailer'], 1) | ||
self.assertIn("mlx 5.1.7", values) | ||
self.assertEqual(types['email-reply-to'], 1) | ||
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@[email protected]>", values) | ||
|
||
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@[email protected]>", values) | ||
|
||
def test_email_header_malformed_encoding(self): | ||
query = {"module":"email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
"extract_urls": None} | ||
filenames = os.listdir("tests/test_files/encodings") | ||
for encoding in ['utf-8', 'utf-16', 'utf-32']: | ||
message = get_base_email() | ||
text = """I am a test e-mail | ||
the password is NOT "this string". | ||
That is all. | ||
""" | ||
message.attach(MIMEText(text, 'plain')) | ||
for hdr, hdr_val in message.items(): | ||
msg = message | ||
encoded_header = hdr_val.encode(encoding) | ||
pat = re.compile(hdr_val.encode()) | ||
message_bytes = pat.sub(encoded_header, msg.as_bytes()) | ||
message64 = base64.b64encode(message_bytes).decode() | ||
query['data'] = message64 | ||
|
||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
results = response.json()['results'] | ||
values = [] | ||
for x in results: | ||
# Remove BOM from UTF-16 strings | ||
if re.search('\ufeff', x["values"]): | ||
values.append(re.sub('\ufeff', "", x["values"])) | ||
else: | ||
values.append(x["values"]) | ||
types = {} | ||
for i in results: | ||
types.setdefault(i["type"], 0) | ||
types[i["type"]] += 1 | ||
# Check that all the items were correct | ||
self.assertEqual(types['target-email'], 1) | ||
self.assertIn('[email protected]', values) | ||
self.assertEqual(types['email-dst-display-name'], 4) | ||
self.assertIn('Last One', values) | ||
self.assertIn('Other Friend', values) | ||
self.assertIn('Second Person', values) | ||
self.assertIn('Testy Testerson', values) | ||
self.assertEqual(types['email-dst'], 4) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertIn('[email protected]', values) | ||
self.assertEqual(types['email-src-display-name'], 2) | ||
self.assertIn("Innocent Person", values) | ||
self.assertEqual(types['email-src'], 2) | ||
self.assertIn("[email protected]", values) | ||
self.assertIn("[email protected]", values) | ||
self.assertEqual(types['email-thread-index'], 1) | ||
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values) | ||
self.assertEqual(types['email-message-id'], 1) | ||
self.assertIn("<[email protected]>", values) | ||
self.assertEqual(types['email-subject'], 1) | ||
self.assertIn("Example Message", values) | ||
self.assertEqual(types['email-header'], 1) | ||
self.assertEqual(types['email-x-mailer'], 1) | ||
self.assertIn("mlx 5.1.7", values) | ||
self.assertEqual(types['email-reply-to'], 1) | ||
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@[email protected]>", values) | ||
|
||
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@[email protected]>", values) | ||
|
||
def test_email_header_CJK_encoding(self): | ||
query = {"module":"email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
"extract_urls": None} | ||
# filenames = os.listdir("tests/test_files/encodings") | ||
# for encoding in ['utf-8', 'utf-16', 'utf-32']: | ||
message = get_base_email() | ||
text = """I am a test e-mail | ||
the password is NOT "this string". | ||
That is all. | ||
""" | ||
message.attach(MIMEText(text, 'plain')) | ||
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合" | ||
jisx213 = Header(japanese_charset, 'euc_jisx0213') | ||
message.replace_header("Subject", jisx213) | ||
query['data'] = decode_email(message) | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
# Parse Response | ||
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?=' | ||
for i in response.json()['results']: | ||
if i['type'] == 'email-subject': | ||
RFC_encoding_error = "The subject was not decoded from RFC2047 format." | ||
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) | ||
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded") | ||
|
||
def test_email_malformed_header_CJK_encoding(self): | ||
query = {"module":"email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
"extract_urls": None} | ||
# filenames = os.listdir("tests/test_files/encodings") | ||
# for encoding in ['utf-8', 'utf-16', 'utf-32']: | ||
message = get_base_email() | ||
text = """I am a test e-mail | ||
the password is NOT "this string". | ||
That is all. | ||
""" | ||
message.attach(MIMEText(text, 'plain')) | ||
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合" | ||
japanese_bytes = japanese_charset.encode() | ||
message.replace_header('Subject', "{{REPLACE}}") | ||
pat = re.compile(b'{{REPLACE}}') | ||
message_bytes = pat.sub(japanese_bytes, message.as_bytes()) | ||
message64 = base64.b64encode(message_bytes).decode() | ||
query['data'] = message64 | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
# Parse Response | ||
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?=' | ||
for i in response.json()['results']: | ||
if i['type'] == 'email-subject': | ||
RFC_encoding_error = "The subject was not decoded from RFC2047 format." | ||
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) | ||
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded") | ||
|
||
def test_email_malformed_header_emoji_encoding(self): | ||
query = {"module":"email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
"extract_urls": None} | ||
# filenames = os.listdir("tests/test_files/encodings") | ||
# for encoding in ['utf-8', 'utf-16', 'utf-32']: | ||
message = get_base_email() | ||
text = """I am a test e-mail | ||
the password is NOT "this string". | ||
That is all. | ||
""" | ||
message.attach(MIMEText(text, 'plain')) | ||
emoji_string = "Emoji Test 👍 checking this" | ||
emoji_bytes = emoji_string.encode() | ||
message.replace_header('Subject', "{{EMOJI}}") | ||
pat = re.compile(b'{{EMOJI}}') | ||
message_bytes = pat.sub(emoji_bytes, message.as_bytes()) | ||
message64 = base64.b64encode(message_bytes).decode() | ||
query['data'] = message64 | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
# Parse Response | ||
RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?=" | ||
for i in response.json()['results']: | ||
if i['type'] == 'email-subject': | ||
RFC_encoding_error = "The subject was not decoded from RFC2047 format." | ||
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) | ||
self.assertEqual(emoji_string, i['values'], "Subject not properly decoded") | ||
|
||
def test_email_attachment_emoji_filename(self): | ||
query = {"module": "email_import"} | ||
query["config"] = {"unzip_attachments": None, | ||
"guess_zip_attachment_passwords": None, | ||
"extract_urls": None} | ||
message = get_base_email() | ||
text = """I am a test e-mail""" | ||
message.attach(MIMEText(text, 'plain')) | ||
with open("tests/EICAR.com", "rb") as fp: | ||
eicar_mime = MIMEApplication(fp.read(), 'com') | ||
eicar_mime.add_header('Content-Disposition', | ||
'attachment', | ||
filename="Emoji Test 👍 checking this") | ||
message.attach(eicar_mime) | ||
query['data'] = decode_email(message) | ||
data = json.dumps(query) | ||
response = requests.post(self.url + "query", data=data) | ||
values = [x["values"] for x in response.json()['results']] | ||
self.assertIn("Emoji Test 👍 checking this", values) | ||
for i in response.json()['results']: | ||
if i["type"] == 'email-attachment': | ||
self.assertEqual(i["values"], "Emoji Test 👍 checking this") | ||
if i['type'] == 'malware-sample': | ||
attch_data = base64.b64decode(i["data"]) | ||
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') | ||
|
||
|
||
def test_email_attachment_password_in_subject(self): | ||
query = {"module": "email_import"} | ||
|