diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f83ae58 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +**/__pycache__/** \ No newline at end of file diff --git a/Object Store/func.py b/Object Store/func.py index 3caaea8..c8886b8 100644 --- a/Object Store/func.py +++ b/Object Store/func.py @@ -1,67 +1,72 @@ import io -import oci -import re import os +import gzip import json -import sys -import requests import logging -import time -import gzip -from fdk import response +import oci +import requests -def handler(ctx, data: io.BytesIO=None): +def handler(ctx, data: io.BytesIO = None): try: body = json.loads(data.getvalue()) - data = body.get("data", {}) - additional_details = data.get("additionalDetails", {}) - namespace = additional_details.get("namespace") - bucket = additional_details.get("bucketName") - obj = data.get("resourceName") - eventtime = body.get("eventTime") + except (Exception, ValueError) as ex: + logging.getLogger().info(str(ex)) + return - source = "Oracle Cloud" #adding a source name. - service = "OCI Logs" #adding a servicen name. + data = body.get("data", {}) + additional_details = data.get("additionalDetails", {}) - datafile = request_one_object(namespace, bucket, obj) - data = str(datafile,'utf-8') + namespace = additional_details.get("namespace") + if not namespace: + logging.getLogger().error("No namespace provided") + return + + bucket = additional_details.get("bucketName") + if not bucket: + logging.getLogger().error("No bucket provided") + return - datadoghost = os.environ['DATADOG_HOST'] - datadogtoken = os.environ['DATADOG_TOKEN'] + resource_name = data.get("resourceName") + if not resource_name: + logging.getLogger().error("No obj provided") + return - for lines in data.splitlines(): - logging.getLogger().info("lines " + lines) - payload = {} - payload.update({"host":obj}) - payload.update({"time": eventtime}) + event_time = body.get("eventTime") - payload.update({"ddsource":source}) - payload.update({"service":service}) + source = "Oracle Cloud" # Adding a source name. + service = "OCI Logs" # Adding a service name. - payload.update({"event":lines}) + datafile = request_one_object(namespace, bucket, resource_name) + data = str(datafile, 'utf-8') - - - headers = {'Content-type': 'application/json', 'DD-API-KEY': datadogtoken} - x = requests.post(datadoghost, data = json.dumps(payload), headers=headers) - logging.getLogger().info(x.text) - print(x.text) + dd_host = os.environ['DATADOG_HOST'] + dd_token = os.environ['DATADOG_TOKEN'] - except (Exception, ValueError) as ex: -# print(str(ex)) - logging.getLogger().info(str(ex)) - return + for lines in data.splitlines(): + logging.getLogger().info("lines %s", lines) + payload = {} + payload.update({"host": resource_name}) + payload.update({"time": event_time}) + + payload.update({"ddsource": source}) + payload.update({"service": service}) + + payload.update({"event": lines}) + + headers = {'Content-type': 'application/json', 'DD-API-KEY': dd_token} + req = requests.post(dd_host, data=json.dumps(payload), headers=headers) + logging.getLogger().info(req.text) -def request_one_object(namespace, bucket, obj): - assert bucket and obj - signer = oci.auth.signers.get_resource_principals_signer() - object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer) - namespace = namespace - bucket_name = bucket - object_name = obj - get_obj = object_storage_client.get_object(namespace, bucket_name, object_name) +def request_one_object(namespace: str, bucket: str, resource_name: str): + """ + Calls OCI to request object from Object Storage Client and decompress + """ + oci_signer = oci.auth.signers.get_resource_principals_signer() + os_client = oci.object_storage.ObjectStorageClient(config={}, + signer=oci_signer) + get_obj = os_client.get_object(namespace, bucket, resource_name) bytes_read = gzip.decompress(get_obj.data.content) - return bytes_read \ No newline at end of file + return bytes_read diff --git a/README.md b/README.md index 857d67d..4df27a4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # Oracle_Logs_Integration -Houses code for OCI's log collections pipeline. +Houses code for OCI's log collection pipeline. diff --git a/Service Connector Hub/func.py b/Service Connector Hub/func.py index e89f3e4..40c11f5 100644 --- a/Service Connector Hub/func.py +++ b/Service Connector Hub/func.py @@ -1,49 +1,55 @@ import io -import oci -import re import os import json -import sys -import requests import logging -import time -import gzip -from fdk import response -def process(body): +import requests + + +def process(body: dict): + data = body.get("data", {}) + source = body.get("source") + time = body.get("time") + dd_source = "oracle_cloud" + service = "OCI Logs" + + # Get json data, time, and source information + payload = {} + payload.update({"source": source}) + payload.update({"time": time}) + payload.update({"data": data}) + payload.update({"ddsource": dd_source}) + payload.update({"service": service}) + + # Datadog endpoint URL and token to call the REST interface. + # These are defined in the func.yaml file. try: - data = body.get("data", {}) - source = body.get("source") - time = body.get("time") - ddsource = "oracle_cloud" - service = "OCI Logs" - - #get json data, time, and source information - payload = {} - payload.update({"source":source}) - payload.update({"time": time}) - payload.update({"data":data}) - payload.update({"ddsource": ddsource}) - payload.update({"service":service}) - - #Datadog endpoint URL and token to call the REST interface. These are defined in the func.yaml file. - datadoghost = os.environ['DATADOG_HOST'] - datadogtoken = os.environ['DATADOG_TOKEN'] - - #Invoke Datadog API with the payload. If the payload contains more than one log this will be ingested as once. - headers = {'Content-type': 'application/json', 'DD-API-KEY': datadogtoken} - x = requests.post(datadoghost, data = json.dumps(payload), headers=headers) - logging.getLogger().info(x.text) + dd_host = os.environ['DATADOG_HOST'] + dd_token = os.environ['DATADOG_TOKEN'] + except KeyError: + err_msg = "Could not find environment variables, \ + please ensure DATADOG_HOST and DATADOG_TOKEN \ + are set as environment variables." + logging.getLogger().error(err_msg) + # Invoke Datadog API with the payload. + # If the payload contains more than one log + # this will be ingested at once. + try: + headers = {'Content-type': 'application/json', 'DD-API-KEY': dd_token} + x = requests.post(dd_host, data=json.dumps(payload), headers=headers) + logging.getLogger().info(x.text) except (Exception, ValueError) as ex: logging.getLogger().error(str(ex)) -""" -This function receives the logging json and invokes the Datadog endpoint for ingesting logs. https://docs.cloud.oracle.com/en-us/iaas/Content/Logging/Reference/top_level_logging_format. htm#top_level_logging_format -If this Function is invoked with more than one log the function go over each log and invokes the Datadog endpoint for ingesting one by one. -""" -def handler(ctx, data: io.BytesIO=None): +def handler(ctx, data: io.BytesIO = None): + """ + This function receives the logging json and invokes the Datadog endpoint + for ingesting logs. https://docs.cloud.oracle.com/en-us/iaas/Content/Logging/Reference/top_level_logging_format.htm#top_level_logging_format + If this Function is invoked with more than one log the function go over + each log and invokes the Datadog endpoint for ingesting one by one. + """ try: body = json.loads(data.getvalue()) if isinstance(body, list): diff --git a/Service Connector Hub/tests/test_func.py b/Service Connector Hub/tests/test_func.py index 7c192d2..dcdff8d 100644 --- a/Service Connector Hub/tests/test_func.py +++ b/Service Connector Hub/tests/test_func.py @@ -3,10 +3,12 @@ from func import handler from unittest import TestCase, mock + def to_BytesIO(str): """ Helper function to turn string test data into expected BytesIO asci encoded bytes """ return BytesIO(bytes(str, 'ascii')) + class TestLogForwarderFunction(TestCase): """ Test simple and batch format json CloudEvent payloads """ @@ -40,10 +42,10 @@ def testSimpleData(self, mock_post, ): handler(ctx=None, data=to_BytesIO(payload)) mock_post.assert_called_once() self.assertEqual(mock_post.mock_calls[0].kwargs['data'], - '{"source": "/mycontext", "time": "2018-04-05T17:31:00Z", "data": ' - '{"appinfoA": "abc", "appinfoB": 123, "appinfoC": true}, "ddsource": ' - '"oracle_cloud", "service": "OCI Logs"}' - ) + '{"source": "/mycontext", "time": "2018-04-05T17:31:00Z", "data": ' + '{"appinfoA": "abc", "appinfoB": 123, "appinfoC": true}, "ddsource": ' + '"oracle_cloud", "service": "OCI Logs"}' + ) @mock.patch("requests.post") def testBatchFormat(self, mock_post): @@ -80,10 +82,9 @@ def testBatchFormat(self, mock_post): """ handler(ctx=None, data=to_BytesIO(batch)) self.assertEqual(mock_post.call_count, 2, "Data was not successfully submitted for entire batch") - self.assertEqual( - [arg.kwargs['data'] for arg in mock_post.call_args_list], - ['{"source": "/mycontext/4", "time": "2018-04-05T17:31:00Z", "data": {}, ' - '"ddsource": "oracle_cloud", "service": "OCI Logs"}', - '{"source": "/mycontext/9", "time": "2018-04-05T17:31:05Z", "data": ' - '{"appinfoA": "potatoes", "appinfoB": 123, "appinfoC": true}, "ddsource": ' - '"oracle_cloud", "service": "OCI Logs"}']) \ No newline at end of file + self.assertEqual([arg.kwargs['data'] for arg in mock_post.call_args_list], + ['{"source": "/mycontext/4", "time": "2018-04-05T17:31:00Z", "data": {}, ' + '"ddsource": "oracle_cloud", "service": "OCI Logs"}', + '{"source": "/mycontext/9", "time": "2018-04-05T17:31:05Z", "data": ' + '{"appinfoA": "potatoes", "appinfoB": 123, "appinfoC": true}, "ddsource": ' + '"oracle_cloud", "service": "OCI Logs"}'])