Skip to content

Commit

Permalink
Add tests for update_defs_from_s3 and related functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Gilmer committed Oct 18, 2019
1 parent f0f6283 commit 6ab1836
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 17 deletions.
22 changes: 8 additions & 14 deletions clamav.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def current_library_search_path():
return rd_ld.findall(ld_verbose)


def update_defs_from_s3(bucket, prefix):
def update_defs_from_s3(s3_client, bucket, prefix):
create_dir(AV_DEFINITION_PATH)
to_download = {}
for file_prefix in AV_DEFINITION_FILE_PREFIXES:
Expand All @@ -54,8 +54,8 @@ def update_defs_from_s3(bucket, prefix):
filename = file_prefix + "." + file_suffix
s3_path = os.path.join(AV_DEFINITION_S3_PREFIX, filename)
local_path = os.path.join(AV_DEFINITION_PATH, filename)
s3_md5 = md5_from_s3_tags(bucket, s3_path)
s3_time = time_from_s3(bucket, s3_path)
s3_md5 = md5_from_s3_tags(s3_client, bucket, s3_path)
s3_time = time_from_s3(s3_client, bucket, s3_path)

if s3_best_time is not None and s3_time < s3_best_time:
print("Not downloading older file in series: %s" % filename)
Expand All @@ -75,22 +75,18 @@ def update_defs_from_s3(bucket, prefix):
"s3_path": s3_path,
"local_path": local_path,
}
return to_download

s3 = boto3.resource("s3")
for file in to_download.values():
s3.Bucket(bucket).download_file(file["s3_path"], file["local_path"])


def upload_defs_to_s3(bucket, prefix, local_path):
s3_client = boto3.client("s3")
def upload_defs_to_s3(s3_client, bucket, prefix, local_path):
for file_prefix in AV_DEFINITION_FILE_PREFIXES:
for file_suffix in AV_DEFINITION_FILE_SUFFIXES:
filename = file_prefix + "." + file_suffix
local_file_path = os.path.join(local_path, filename)
if os.path.exists(local_file_path):
local_file_md5 = md5_from_file(local_file_path)
if local_file_md5 != md5_from_s3_tags(
bucket, os.path.join(prefix, filename)
s3_client, bucket, os.path.join(prefix, filename)
):
print(
"Uploading %s to s3://%s"
Expand Down Expand Up @@ -148,8 +144,7 @@ def md5_from_file(filename):
return hash_md5.hexdigest()


def md5_from_s3_tags(bucket, key):
s3_client = boto3.client("s3")
def md5_from_s3_tags(s3_client, bucket, key):
try:
tags = s3_client.get_object_tagging(Bucket=bucket, Key=key)["TagSet"]
except botocore.exceptions.ClientError as e:
Expand All @@ -164,8 +159,7 @@ def md5_from_s3_tags(bucket, key):
return ""


def time_from_s3(bucket, key):
s3_client = boto3.client("s3")
def time_from_s3(s3_client, bucket, key):
try:
time = s3_client.head_object(Bucket=bucket, Key=key)["LastModified"]
except botocore.exceptions.ClientError as e:
Expand Down
262 changes: 262 additions & 0 deletions clamav_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,41 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import re
import textwrap
import unittest

import boto3
import botocore.session
from botocore.stub import Stubber
import mock

from clamav import RE_SEARCH_DIR
from clamav import scan_output_to_json
from clamav import md5_from_s3_tags
from clamav import time_from_s3
from clamav import update_defs_from_s3
from common import AV_DEFINITION_FILE_PREFIXES
from common import AV_DEFINITION_FILE_SUFFIXES
from common import AV_DEFINITION_S3_PREFIX
from common import AV_SIGNATURE_OK


class TestClamAV(unittest.TestCase):
def setUp(self):
# Common data
self.s3_bucket_name = "test_bucket"
self.s3_key_name = "test_key"

# Clients and Resources
self.s3 = boto3.resource("s3")
self.s3_client = botocore.session.get_session().create_client("s3")
self.sns_client = botocore.session.get_session().create_client(
"sns", region_name="us-west-2"
)

def test_current_library_search_path(self):
# Calling `ld --verbose` returns a lot of text but the line to check is this one:
search_path = """SEARCH_DIR("=/usr/x86_64-redhat-linux/lib64"); SEARCH_DIR("=/usr/lib64"); SEARCH_DIR("=/usr/local/lib64"); SEARCH_DIR("=/lib64"); SEARCH_DIR("=/usr/x86_64-redhat-linux/lib"); SEARCH_DIR("=/usr/local/lib"); SEARCH_DIR("=/lib"); SEARCH_DIR("=/usr/lib");""" # noqa
Expand Down Expand Up @@ -88,3 +113,240 @@ def test_scan_output_to_json_infected(self):
summary = scan_output_to_json(output)
self.assertEqual(summary[file_path], signature)
self.assertEqual(summary["Infected files"], "1")

def test_md5_from_s3_tags_no_md5(self):
tag_set = {"TagSet": []}

s3_stubber = Stubber(self.s3_client)
get_object_tagging_response = tag_set
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": self.s3_key_name,
}
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)
with s3_stubber:
md5_hash = md5_from_s3_tags(
self.s3_client, self.s3_bucket_name, self.s3_key_name
)
self.assertEquals("", md5_hash)

def test_md5_from_s3_tags_has_md5(self):
expected_md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
tag_set = {"TagSet": [{"Key": "md5", "Value": expected_md5_hash}]}

s3_stubber = Stubber(self.s3_client)
get_object_tagging_response = tag_set
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": self.s3_key_name,
}
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)
with s3_stubber:
md5_hash = md5_from_s3_tags(
self.s3_client, self.s3_bucket_name, self.s3_key_name
)
self.assertEquals(expected_md5_hash, md5_hash)

def test_time_from_s3(self):

expected_s3_time = datetime.datetime(2019, 1, 1)

s3_stubber = Stubber(self.s3_client)
head_object_response = {"LastModified": expected_s3_time}
head_object_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": self.s3_key_name,
}
s3_stubber.add_response(
"head_object", head_object_response, head_object_expected_params
)
with s3_stubber:
s3_time = time_from_s3(
self.s3_client, self.s3_bucket_name, self.s3_key_name
)
self.assertEquals(expected_s3_time, s3_time)

@mock.patch("clamav.md5_from_file")
@mock.patch("common.os.path.exists")
def test_update_defs_from_s3(self, mock_exists, mock_md5_from_file):
expected_md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
different_md5_hash = "d41d8cd98f00b204e9800998ecf8427f"

mock_md5_from_file.return_value = different_md5_hash

tag_set = {"TagSet": [{"Key": "md5", "Value": expected_md5_hash}]}
expected_s3_time = datetime.datetime(2019, 1, 1)

s3_stubber = Stubber(self.s3_client)

key_names = []
side_effect = []
for file_prefix in AV_DEFINITION_FILE_PREFIXES:
for file_suffix in AV_DEFINITION_FILE_SUFFIXES:
side_effect.extend([True, True])
filename = file_prefix + "." + file_suffix
key_names.append(os.path.join(AV_DEFINITION_S3_PREFIX, filename))
mock_exists.side_effect = side_effect

for s3_key_name in key_names:
get_object_tagging_response = tag_set
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)
head_object_response = {"LastModified": expected_s3_time}
head_object_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"head_object", head_object_response, head_object_expected_params
)

expected_to_download = {
"bytecode": {
"local_path": "/tmp/clamav_defs/bytecode.cvd",
"s3_path": "clamav_defs/bytecode.cvd",
},
"daily": {
"local_path": "/tmp/clamav_defs/daily.cvd",
"s3_path": "clamav_defs/daily.cvd",
},
"main": {
"local_path": "/tmp/clamav_defs/main.cvd",
"s3_path": "clamav_defs/main.cvd",
},
}
with s3_stubber:
to_download = update_defs_from_s3(
self.s3_client, self.s3_bucket_name, AV_DEFINITION_S3_PREFIX
)
self.assertEquals(expected_to_download, to_download)

@mock.patch("clamav.md5_from_file")
@mock.patch("common.os.path.exists")
def test_update_defs_from_s3_same_hash(self, mock_exists, mock_md5_from_file):
expected_md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
different_md5_hash = expected_md5_hash

mock_md5_from_file.return_value = different_md5_hash

tag_set = {"TagSet": [{"Key": "md5", "Value": expected_md5_hash}]}
expected_s3_time = datetime.datetime(2019, 1, 1)

s3_stubber = Stubber(self.s3_client)

key_names = []
side_effect = []
for file_prefix in AV_DEFINITION_FILE_PREFIXES:
for file_suffix in AV_DEFINITION_FILE_SUFFIXES:
side_effect.extend([True, True])
filename = file_prefix + "." + file_suffix
key_names.append(os.path.join(AV_DEFINITION_S3_PREFIX, filename))
mock_exists.side_effect = side_effect

for s3_key_name in key_names:
get_object_tagging_response = tag_set
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)
head_object_response = {"LastModified": expected_s3_time}
head_object_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"head_object", head_object_response, head_object_expected_params
)

expected_to_download = {}
with s3_stubber:
to_download = update_defs_from_s3(
self.s3_client, self.s3_bucket_name, AV_DEFINITION_S3_PREFIX
)
self.assertEquals(expected_to_download, to_download)

@mock.patch("clamav.md5_from_file")
@mock.patch("common.os.path.exists")
def test_update_defs_from_s3_old_files(self, mock_exists, mock_md5_from_file):
expected_md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
different_md5_hash = "d41d8cd98f00b204e9800998ecf8427f"

mock_md5_from_file.return_value = different_md5_hash

tag_set = {"TagSet": [{"Key": "md5", "Value": expected_md5_hash}]}
expected_s3_time = datetime.datetime(2019, 1, 1)

s3_stubber = Stubber(self.s3_client)

key_names = []
side_effect = []
for file_prefix in AV_DEFINITION_FILE_PREFIXES:
for file_suffix in AV_DEFINITION_FILE_SUFFIXES:
side_effect.extend([True, True])
filename = file_prefix + "." + file_suffix
key_names.append(os.path.join(AV_DEFINITION_S3_PREFIX, filename))
mock_exists.side_effect = side_effect

count = 0
for s3_key_name in key_names:
get_object_tagging_response = tag_set
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)
head_object_response = {
"LastModified": expected_s3_time - datetime.timedelta(hours=count)
}
head_object_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": s3_key_name,
}
s3_stubber.add_response(
"head_object", head_object_response, head_object_expected_params
)
count += 1

expected_to_download = {
"bytecode": {
"local_path": "/tmp/clamav_defs/bytecode.cld",
"s3_path": "clamav_defs/bytecode.cld",
},
"daily": {
"local_path": "/tmp/clamav_defs/daily.cld",
"s3_path": "clamav_defs/daily.cld",
},
"main": {
"local_path": "/tmp/clamav_defs/main.cld",
"s3_path": "clamav_defs/main.cld",
},
}
with s3_stubber:
to_download = update_defs_from_s3(
self.s3_client, self.s3_bucket_name, AV_DEFINITION_S3_PREFIX
)
self.assertEquals(expected_to_download, to_download)
1 change: 1 addition & 0 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import errno
import datetime
import os
import os.path

AV_DEFINITION_S3_BUCKET = os.getenv("AV_DEFINITION_S3_BUCKET")
AV_DEFINITION_S3_PREFIX = os.getenv("AV_DEFINITION_S3_PREFIX", "clamav_defs")
Expand Down
Loading

0 comments on commit 6ab1836

Please sign in to comment.