Skip to content

Commit

Permalink
Init format cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dtru-ddog committed Oct 19, 2023
1 parent aeb693d commit 64ee5a5
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 94 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.pyc
**/__pycache__/**
99 changes: 52 additions & 47 deletions Object Store/func.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,72 @@
import io
import oci
import re
import os
import gzip
import json
import sys
import requests
import logging
import time
import gzip

from fdk import response
import oci
import requests


def handler(ctx, data: io.BytesIO=None):
def handler(ctx, data: io.BytesIO = None):
try:
body = json.loads(data.getvalue())
data = body.get("data", {})
additional_details = data.get("additionalDetails", {})
namespace = additional_details.get("namespace")
bucket = additional_details.get("bucketName")
obj = data.get("resourceName")
eventtime = body.get("eventTime")
except (Exception, ValueError) as ex:
logging.getLogger().info(str(ex))
return

source = "Oracle Cloud" #adding a source name.
service = "OCI Logs" #adding a servicen name.
data = body.get("data", {})
additional_details = data.get("additionalDetails", {})

datafile = request_one_object(namespace, bucket, obj)
data = str(datafile,'utf-8')
namespace = additional_details.get("namespace")
if not namespace:
logging.getLogger().error("No namespace provided")
return

bucket = additional_details.get("bucketName")
if not bucket:
logging.getLogger().error("No bucket provided")
return

datadoghost = os.environ['DATADOG_HOST']
datadogtoken = os.environ['DATADOG_TOKEN']
resource_name = data.get("resourceName")
if not resource_name:
logging.getLogger().error("No obj provided")
return

for lines in data.splitlines():
logging.getLogger().info("lines " + lines)
payload = {}
payload.update({"host":obj})
payload.update({"time": eventtime})
event_time = body.get("eventTime")

payload.update({"ddsource":source})
payload.update({"service":service})
source = "Oracle Cloud" # Adding a source name.
service = "OCI Logs" # Adding a service name.

payload.update({"event":lines})
datafile = request_one_object(namespace, bucket, resource_name)
data = str(datafile, 'utf-8')



headers = {'Content-type': 'application/json', 'DD-API-KEY': datadogtoken}
x = requests.post(datadoghost, data = json.dumps(payload), headers=headers)
logging.getLogger().info(x.text)
print(x.text)
dd_host = os.environ['DATADOG_HOST']
dd_token = os.environ['DATADOG_TOKEN']

except (Exception, ValueError) as ex:
# print(str(ex))
logging.getLogger().info(str(ex))
return
for lines in data.splitlines():
logging.getLogger().info("lines %s", lines)
payload = {}
payload.update({"host": resource_name})
payload.update({"time": event_time})

payload.update({"ddsource": source})
payload.update({"service": service})

payload.update({"event": lines})

headers = {'Content-type': 'application/json', 'DD-API-KEY': dd_token}
req = requests.post(dd_host, data=json.dumps(payload), headers=headers)
logging.getLogger().info(req.text)


def request_one_object(namespace, bucket, obj):
assert bucket and obj
signer = oci.auth.signers.get_resource_principals_signer()
object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
namespace = namespace
bucket_name = bucket
object_name = obj
get_obj = object_storage_client.get_object(namespace, bucket_name, object_name)
def request_one_object(namespace: str, bucket: str, resource_name: str):
"""
Calls OCI to request object from Object Storage Client and decompress
"""
oci_signer = oci.auth.signers.get_resource_principals_signer()
os_client = oci.object_storage.ObjectStorageClient(config={},
signer=oci_signer)
get_obj = os_client.get_object(namespace, bucket, resource_name)
bytes_read = gzip.decompress(get_obj.data.content)
return bytes_read
return bytes_read
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Oracle_Logs_Integration
Houses code for OCI's log collections pipeline.
Houses code for OCI's log collection pipeline.
76 changes: 41 additions & 35 deletions Service Connector Hub/func.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,55 @@
import io
import oci
import re
import os
import json
import sys
import requests
import logging
import time
import gzip
from fdk import response

def process(body):
import requests


def process(body: dict):
data = body.get("data", {})
source = body.get("source")
time = body.get("time")
dd_source = "oracle_cloud"
service = "OCI Logs"

# Get json data, time, and source information
payload = {}
payload.update({"source": source})
payload.update({"time": time})
payload.update({"data": data})
payload.update({"ddsource": dd_source})
payload.update({"service": service})

# Datadog endpoint URL and token to call the REST interface.
# These are defined in the func.yaml file.
try:
data = body.get("data", {})
source = body.get("source")
time = body.get("time")
ddsource = "oracle_cloud"
service = "OCI Logs"

#get json data, time, and source information
payload = {}
payload.update({"source":source})
payload.update({"time": time})
payload.update({"data":data})
payload.update({"ddsource": ddsource})
payload.update({"service":service})

#Datadog endpoint URL and token to call the REST interface. These are defined in the func.yaml file.
datadoghost = os.environ['DATADOG_HOST']
datadogtoken = os.environ['DATADOG_TOKEN']

#Invoke Datadog API with the payload. If the payload contains more than one log this will be ingested as once.
headers = {'Content-type': 'application/json', 'DD-API-KEY': datadogtoken}
x = requests.post(datadoghost, data = json.dumps(payload), headers=headers)
logging.getLogger().info(x.text)
dd_host = os.environ['DATADOG_HOST']
dd_token = os.environ['DATADOG_TOKEN']
except KeyError:
err_msg = "Could not find environment variables, \
please ensure DATADOG_HOST and DATADOG_TOKEN \
are set as environment variables."
logging.getLogger().error(err_msg)

# Invoke Datadog API with the payload.
# If the payload contains more than one log
# this will be ingested at once.
try:
headers = {'Content-type': 'application/json', 'DD-API-KEY': dd_token}
x = requests.post(dd_host, data=json.dumps(payload), headers=headers)
logging.getLogger().info(x.text)
except (Exception, ValueError) as ex:
logging.getLogger().error(str(ex))


"""
This function receives the logging json and invokes the Datadog endpoint for ingesting logs. https://docs.cloud.oracle.com/en-us/iaas/Content/Logging/Reference/top_level_logging_format. htm#top_level_logging_format
If this Function is invoked with more than one log the function go over each log and invokes the Datadog endpoint for ingesting one by one.
"""
def handler(ctx, data: io.BytesIO=None):
def handler(ctx, data: io.BytesIO = None):
"""
This function receives the logging json and invokes the Datadog endpoint
for ingesting logs. https://docs.cloud.oracle.com/en-us/iaas/Content/Logging/Reference/top_level_logging_format.htm#top_level_logging_format
If this Function is invoked with more than one log the function go over
each log and invokes the Datadog endpoint for ingesting one by one.
"""
try:
body = json.loads(data.getvalue())
if isinstance(body, list):
Expand Down
23 changes: 12 additions & 11 deletions Service Connector Hub/tests/test_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from func import handler
from unittest import TestCase, mock


def to_BytesIO(str):
""" Helper function to turn string test data into expected BytesIO asci encoded bytes """
return BytesIO(bytes(str, 'ascii'))


class TestLogForwarderFunction(TestCase):
""" Test simple and batch format json CloudEvent payloads """

Expand Down Expand Up @@ -40,10 +42,10 @@ def testSimpleData(self, mock_post, ):
handler(ctx=None, data=to_BytesIO(payload))
mock_post.assert_called_once()
self.assertEqual(mock_post.mock_calls[0].kwargs['data'],
'{"source": "/mycontext", "time": "2018-04-05T17:31:00Z", "data": '
'{"appinfoA": "abc", "appinfoB": 123, "appinfoC": true}, "ddsource": '
'"oracle_cloud", "service": "OCI Logs"}'
)
'{"source": "/mycontext", "time": "2018-04-05T17:31:00Z", "data": '
'{"appinfoA": "abc", "appinfoB": 123, "appinfoC": true}, "ddsource": '
'"oracle_cloud", "service": "OCI Logs"}'
)

@mock.patch("requests.post")
def testBatchFormat(self, mock_post):
Expand Down Expand Up @@ -80,10 +82,9 @@ def testBatchFormat(self, mock_post):
"""
handler(ctx=None, data=to_BytesIO(batch))
self.assertEqual(mock_post.call_count, 2, "Data was not successfully submitted for entire batch")
self.assertEqual(
[arg.kwargs['data'] for arg in mock_post.call_args_list],
['{"source": "/mycontext/4", "time": "2018-04-05T17:31:00Z", "data": {}, '
'"ddsource": "oracle_cloud", "service": "OCI Logs"}',
'{"source": "/mycontext/9", "time": "2018-04-05T17:31:05Z", "data": '
'{"appinfoA": "potatoes", "appinfoB": 123, "appinfoC": true}, "ddsource": '
'"oracle_cloud", "service": "OCI Logs"}'])
self.assertEqual([arg.kwargs['data'] for arg in mock_post.call_args_list],
['{"source": "/mycontext/4", "time": "2018-04-05T17:31:00Z", "data": {}, '
'"ddsource": "oracle_cloud", "service": "OCI Logs"}',
'{"source": "/mycontext/9", "time": "2018-04-05T17:31:05Z", "data": '
'{"appinfoA": "potatoes", "appinfoB": 123, "appinfoC": true}, "ddsource": '
'"oracle_cloud", "service": "OCI Logs"}'])

0 comments on commit 64ee5a5

Please sign in to comment.