Skip to content

Commit

Permalink
verify sane log times in logging stack
Browse files Browse the repository at this point in the history
This patch verifies that logs sent from logging pods can be queried on
the Elasticsearch pod within a reasonable amount of time.
  • Loading branch information
juanvallejo committed Jul 20, 2017
1 parent 0b0e0d1 commit ede0374
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ class LoggingCheck(OpenShiftCheck):
"""Base class for logging component checks"""

name = "logging"
logging_namespace = "logging"

@classmethod
def is_active(cls, task_vars):
return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars)
logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=False)
return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) and logging_deployed

@staticmethod
def is_first_master(task_vars):
"""Run only on first master and only when logging is configured. Returns: bool"""
logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=True)
"""Run only on first master. Returns: bool"""
# Note: It would be nice to use membership in oo_first_master group, however for now it
# seems best to avoid requiring that setup and just check this is the first master.
hostname = get_var(task_vars, "ansible_ssh_host") or [None]
masters = get_var(task_vars, "groups", "masters", default=None) or [None]
return logging_deployed and masters[0] == hostname
return masters and masters[0] == hostname

def run(self, tmp, task_vars):
pass
Expand All @@ -45,7 +46,7 @@ def get_pods_for_component(self, execute_module, namespace, logging_component, t
raise ValueError()
except ValueError:
# successful run but non-parsing data generally means there were no pods in the namespace
return None, 'There are no pods in the {} namespace. Is logging deployed?'.format(namespace)
return None, 'No pods were found for the "{}" logging component.'.format(logging_component)

return pods['items'], None

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Check for ensuring logs from pods can be queried in a reasonable amount of time.
"""

import json
import time

from uuid import uuid4

from openshift_checks import get_var, OpenShiftCheckException
from openshift_checks.logging.logging import LoggingCheck


ES_CMD_TIMEOUT_SECONDS = 30


class LoggingIndexTime(LoggingCheck):
"""Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
name = "logging_index_time"
tags = ["health", "logging"]

logging_namespace = "logging"

def run(self, tmp, task_vars):
"""Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
try:
log_index_timeout = int(
get_var(task_vars, "openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS)
)
except ValueError:
return {
"failed": True,
"msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". '
'Value must be an integer representing an amount in seconds.'),
}

running_component_pods = dict()

# get all component pods
self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace)
for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']):
pods, error = self.get_pods_for_component(
self.execute_module, self.logging_namespace, component, task_vars,
)

if error:
msg = 'Unable to retrieve pods for the {} logging component: {}'
return {"failed": True, "changed": False, "msg": msg.format(name, error)}

running_pods = self.running_pods(pods)

if not running_pods:
msg = ('No {} pods in the "Running" state were found.'
'At least one pod is required in order to perform this check.')
return {"failed": True, "changed": False, "msg": msg.format(name)}

running_component_pods[component] = running_pods

uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0], task_vars)
self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout, task_vars)
return {}

def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs, task_vars):
"""Retry an Elasticsearch query every second until query success, or a defined
length of time has passed."""
deadline = time.time() + timeout_secs
interval = 1
while not self.query_es_from_es(es_pod, uuid, task_vars):
if time.time() + interval > deadline:
msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
raise OpenShiftCheckException(msg.format(uuid, timeout_secs))
time.sleep(interval)

def curl_kibana_with_uuid(self, kibana_pod, task_vars):
"""curl Kibana with a unique uuid."""
uuid = self.generate_uuid()
pod_name = kibana_pod["metadata"]["name"]
exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}"
exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid)

error_str = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)

try:
error_code = json.loads(error_str)["statusCode"]
except KeyError:
msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)
except ValueError:
msg = ('invalid response returned from Kibana request (Non-JSON output):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)

if error_code != 404:
msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.'
raise OpenShiftCheckException(msg.format(error_code))

return uuid

def query_es_from_es(self, es_pod, uuid, task_vars):
"""curl the Elasticsearch pod and look for a unique uuid in its logs."""
pod_name = es_pod["metadata"]["name"]
exec_cmd = (
"exec {pod_name} -- curl --max-time 30 -s -f "
"--cacert /etc/elasticsearch/secret/admin-ca "
"--cert /etc/elasticsearch/secret/admin-cert "
"--key /etc/elasticsearch/secret/admin-key "
"https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}"
)
exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid)
result = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)

try:
count = json.loads(result)["count"]
except KeyError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))
except ValueError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))

return count

@staticmethod
def running_pods(pods):
"""Filter pods that are running."""
return [pod for pod in pods if pod['status']['phase'] == 'Running']

@staticmethod
def generate_uuid():
"""Wrap uuid generator. Allows for testing with expected values."""
return str(uuid4())
2 changes: 1 addition & 1 deletion roles/openshift_health_checker/test/logging_check_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_is_active(groups, logging_deployed, is_active):
(
'No resources found.',
None,
'There are no pods in the logging namespace',
'No pods were found for the "es"',
),
(
json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}),
Expand Down
182 changes: 182 additions & 0 deletions roles/openshift_health_checker/test/logging_index_time_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import json

import pytest

from openshift_checks.logging.logging_index_time import LoggingIndexTime, OpenShiftCheckException


SAMPLE_UUID = "unique-test-uuid"


def canned_loggingindextime(exec_oc=None):
"""Create a check object with a canned exec_oc method"""
check = LoggingIndexTime("dummy") # fails if a module is actually invoked
if exec_oc:
check.exec_oc = exec_oc
return check


plain_running_elasticsearch_pod = {
"metadata": {
"labels": {"component": "es", "deploymentconfig": "logging-es-data-master"},
"name": "logging-es-data-master-1",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": True}],
"phase": "Running",
}
}
plain_running_kibana_pod = {
"metadata": {
"labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
"name": "logging-kibana-1",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": True}],
"phase": "Running",
}
}
not_running_kibana_pod = {
"metadata": {
"labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
"name": "logging-kibana-2",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": False}],
"conditions": [{"status": "True", "type": "Ready"}],
"phase": "pending",
}
}


@pytest.mark.parametrize('pods, expect_pods', [
(
[not_running_kibana_pod],
[],
),
(
[plain_running_kibana_pod],
[plain_running_kibana_pod],
),
(
[],
[],
)
])
def test_check_running_pods(pods, expect_pods):
check = canned_loggingindextime(None)
pods = check.running_pods(pods)
assert pods == expect_pods


@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
(
'valid count in response',
{
"count": 1,
},
SAMPLE_UUID,
0.001,
[],
),
], ids=lambda argval: argval[0])
def test_wait_until_cmd_or_err_succeeds(name, json_response, uuid, timeout, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)


@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
(
'invalid json response',
{
"invalid_field": 1,
},
SAMPLE_UUID,
0.001,
["invalid response", "Elasticsearch"],
),
(
'empty response',
{},
SAMPLE_UUID,
0.001,
["invalid response", "Elasticsearch"],
),
(
'valid response but invalid match count',
{
"count": 0,
},
SAMPLE_UUID,
0.005,
["expecting match", SAMPLE_UUID, "0.005s"],
)
], ids=lambda argval: argval[0])
def test_wait_until_cmd_or_err(name, json_response, uuid, timeout, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
with pytest.raises(OpenShiftCheckException) as error:
check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)

for word in extra_words:
assert word in str(error)


@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
(
'correct response code, found unique id is returned',
{
"statusCode": 404,
},
"sample unique id",
["sample unique id"],
),
], ids=lambda argval: argval[0])
def test_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.generate_uuid = lambda: uuid

result = check.curl_kibana_with_uuid(plain_running_kibana_pod, None)

for word in extra_words:
assert word in result


@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
(
'invalid json response',
{
"invalid_field": "invalid",
},
SAMPLE_UUID,
["invalid response returned", 'Missing "statusCode" key'],
),
(
'wrong error code in response',
{
"statusCode": 500,
},
SAMPLE_UUID,
["Expecting error code", "500"],
),
], ids=lambda argval: argval[0])
def test_failed_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.generate_uuid = lambda: uuid

with pytest.raises(OpenShiftCheckException) as error:
check.curl_kibana_with_uuid(plain_running_kibana_pod, None)

for word in extra_words:
assert word in str(error)

0 comments on commit ede0374

Please sign in to comment.