Skip to content

Commit

Permalink
add elasticseatch, fluentd, kibana check
Browse files Browse the repository at this point in the history
  • Loading branch information
juanvallejo committed Jun 2, 2017
1 parent 46dca9b commit 2e53dbb
Show file tree
Hide file tree
Showing 13 changed files with 1,575 additions and 6 deletions.
74 changes: 74 additions & 0 deletions roles/openshift_health_checker/library/ocutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/python
"""Interface to OpenShift oc command"""

import os
import shlex
import shutil
import subprocess

from ansible.module_utils.basic import AnsibleModule


ADDITIONAL_PATH_LOOKUPS = ['/usr/local/bin', os.path.expanduser('~/bin')]


def locate_oc_binary():
"""Find and return oc binary file"""
# https://github.com/openshift/openshift-ansible/issues/3410
# oc can be in /usr/local/bin in some cases, but that may not
# be in $PATH due to ansible/sudo
paths = os.environ.get("PATH", os.defpath).split(os.pathsep) + ADDITIONAL_PATH_LOOKUPS

oc_binary = 'oc'

# Use shutil.which if it is available, otherwise fallback to a naive path search
try:
which_result = shutil.which(oc_binary, path=os.pathsep.join(paths))
if which_result is not None:
oc_binary = which_result
except AttributeError:
for path in paths:
if os.path.exists(os.path.join(path, oc_binary)):
oc_binary = os.path.join(path, oc_binary)
break

return oc_binary


def main():
"""Module that executes commands on a remote OpenShift cluster"""

module = AnsibleModule(
argument_spec=dict(
namespace=dict(type="str", required=True),
config_file=dict(type="str", required=True),
cmd=dict(type="str", required=True),
extra_args=dict(type="list", default=[]),
),
)

cmd = [
locate_oc_binary(),
'--config', module.params["config_file"],
'-n', module.params["namespace"],
] + shlex.split(module.params["cmd"])

failed = True
try:
cmd_result = subprocess.check_output(list(cmd), stderr=subprocess.STDOUT)
failed = False
except subprocess.CalledProcessError as exc:
cmd_result = '[rc {}] {}\n{}'.format(exc.returncode, ' '.join(exc.cmd), exc.output)
except OSError as exc:
# we get this when 'oc' is not there
cmd_result = str(exc)

module.exit_json(
changed=False,
failed=failed,
result=cmd_result,
)


if __name__ == '__main__':
main()
22 changes: 16 additions & 6 deletions roles/openshift_health_checker/openshift_checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,26 @@ def subclasses(cls):
LOADER_EXCLUDES = (
"__init__.py",
"mixins.py",
"logging.py",
)


def load_checks():
def load_checks(path=None, subpkg=""):
"""Dynamically import all check modules for the side effect of registering checks."""
return [
import_module(__package__ + "." + name[:-3])
for name in os.listdir(os.path.dirname(__file__))
if name.endswith(".py") and name not in LOADER_EXCLUDES
]
if path is None:
path = os.path.dirname(__file__)

modules = []

for name in os.listdir(path):
if os.path.isdir(os.path.join(path, name)):
modules = modules + load_checks(os.path.join(path, name), subpkg + "." + name)
continue

if name.endswith(".py") and name not in LOADER_EXCLUDES:
modules.append(import_module(__package__ + subpkg + "." + name[:-3]))

return modules


def get_var(task_vars, *keys, **kwargs):
Expand Down
Empty file.
61 changes: 61 additions & 0 deletions roles/openshift_health_checker/openshift_checks/logging/curator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Module for performing checks on an Curator logging deployment
"""

from openshift_checks import get_var
from openshift_checks.logging.logging import LoggingCheck


class Curator(LoggingCheck):
"""Module that checks an integrated logging Curator deployment"""

name = "curator"
tags = ["health", "logging"]

logging_namespace = None

def run(self, tmp, task_vars):
"""Check various things and gather errors. Returns: result as hash"""

self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging")
curator_pods, error = super(Curator, self).get_pods_for_component(
self.module_executor,
self.logging_namespace,
"curator",
task_vars
)
if error:
return {"failed": True, "changed": False, "msg": error}
check_error = self.check_curator(curator_pods)

if check_error:
msg = ("The following Curator deployment issue was found:"
"\n-------\n"
"{}".format(check_error))
return {"failed": True, "changed": False, "msg": msg}

# TODO(lmeyer): run it all again for the ops cluster
return {"failed": False, "changed": False, "msg": 'No problems found with Curator deployment.'}

def check_curator(self, pods):
"""Check to see if curator is up and working. Returns: error string"""
if not pods:
return (
"There are no Curator pods for the logging stack,\n"
"so nothing will prune Elasticsearch indexes.\n"
"Is Curator correctly deployed?"
)

not_running = super(Curator, self).not_running_pods(pods)
if len(not_running) == len(pods):
return (
"The Curator pod is not currently in a running state,\n"
"so Elasticsearch indexes may increase without bound."
)
if len(pods) - len(not_running) > 1:
return (
"There is more than one Curator pod running. This should not normally happen.\n"
"Although this doesn't cause any problems, you may want to investigate."
)

return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""
Module for performing checks on an Elasticsearch logging deployment
"""

import json
import re

from openshift_checks import get_var
from openshift_checks.logging.logging import LoggingCheck


class Elasticsearch(LoggingCheck):
"""Module that checks an integrated logging Elasticsearch deployment"""

name = "elasticsearch"
tags = ["health", "logging"]

logging_namespace = None

def run(self, tmp, task_vars):
"""Check various things and gather errors. Returns: result as hash"""

self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging")
es_pods, error = super(Elasticsearch, self).get_pods_for_component(
self.execute_module,
self.logging_namespace,
"es",
task_vars,
)
if error:
return {"failed": True, "changed": False, "msg": error}
check_error = self.check_elasticsearch(es_pods, task_vars)

if check_error:
msg = ("The following Elasticsearch deployment issue was found:"
"\n-------\n"
"{}".format(check_error))
return {"failed": True, "changed": False, "msg": msg}

# TODO(lmeyer): run it all again for the ops cluster
return {"failed": False, "changed": False, "msg": 'No problems found with Elasticsearch deployment.'}

def _not_running_elasticsearch_pods(self, es_pods):
"""Returns: list of running pods, list of errors about non-running pods"""
not_running = super(Elasticsearch, self).not_running_pods(es_pods)
if not_running:
return not_running, [(
'The following Elasticsearch pods are not running:\n'
'{pods}'
'These pods will not aggregate logs from their nodes.'
).format(pods=''.join(
" {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
for pod in not_running
))]
return not_running, []

def check_elasticsearch(self, es_pods, task_vars):
"""Various checks for elasticsearch. Returns: error string"""
not_running_pods, error_msgs = self._not_running_elasticsearch_pods(es_pods)
running_pods = [pod for pod in es_pods if pod not in not_running_pods]
pods_by_name = {
pod['metadata']['name']: pod for pod in running_pods
# Filter out pods that are not members of a DC
if pod['metadata'].get('labels', {}).get('deploymentconfig')
}
if not pods_by_name:
return 'No logging Elasticsearch pods were found. Is logging deployed?'
error_msgs += self._check_elasticsearch_masters(pods_by_name, task_vars)
error_msgs += self._check_elasticsearch_node_list(pods_by_name, task_vars)
error_msgs += self._check_es_cluster_health(pods_by_name, task_vars)
error_msgs += self._check_elasticsearch_diskspace(pods_by_name, task_vars)
return '\n'.join(error_msgs)

@staticmethod
def _build_es_curl_cmd(pod_name, url):
base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'"
return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url)

def _check_elasticsearch_masters(self, pods_by_name, task_vars):
"""Check that Elasticsearch masters are sane. Returns: list of error strings"""
es_master_names = set()
error_msgs = []
for pod_name in pods_by_name.keys():
# Compare what each ES node reports as master and compare for split brain
get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master")
master_name_str = self._exec_oc(get_master_cmd, [], task_vars)
master_names = (master_name_str or '').split(' ')
if len(master_names) > 1:
es_master_names.add(master_names[1])
else:
error_msgs.append(
'No master? Elasticsearch {pod} returned bad string when asked master name:\n'
' {response}'.format(pod=pod_name, response=master_name_str)
)

if not es_master_names:
error_msgs.append('No logging Elasticsearch masters were found. Is logging deployed?')
return '\n'.join(error_msgs)

if len(es_master_names) > 1:
error_msgs.append(
'Found multiple Elasticsearch masters according to the pods:\n'
'{master_list}\n'
'This implies that the masters have "split brain" and are not correctly\n'
'replicating data for the logging cluster. Log loss is likely to occur.'
.format(master_list='\n'.join(' ' + master for master in es_master_names))
)

return error_msgs

def _check_elasticsearch_node_list(self, pods_by_name, task_vars):
"""Check that reported ES masters are accounted for by pods. Returns: list of error strings"""

if not pods_by_name:
return ['No logging Elasticsearch masters were found. Is logging deployed?']

# get ES cluster nodes
node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes')
cluster_node_data = self._exec_oc(node_cmd, [], task_vars)
try:
cluster_nodes = json.loads(cluster_node_data)['nodes']
except (ValueError, KeyError):
return [
'Failed to query Elasticsearch for the list of ES nodes. The output was:\n' +
cluster_node_data
]

# Try to match all ES-reported node hosts to known pods.
error_msgs = []
for node in cluster_nodes.values():
# Note that with 1.4/3.4 the pod IP may be used as the master name
if not any(node['host'] in (pod_name, pod['status'].get('podIP'))
for pod_name, pod in pods_by_name.items()):
error_msgs.append(
'The Elasticsearch cluster reports a member node "{node}"\n'
'that does not correspond to any known ES pod.'.format(node=node['host'])
)

return error_msgs

def _check_es_cluster_health(self, pods_by_name, task_vars):
"""Exec into the elasticsearch pods and check the cluster health. Returns: list of errors"""
error_msgs = []
for pod_name in pods_by_name.keys():
cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true')
cluster_health_data = self._exec_oc(cluster_health_cmd, [], task_vars)
try:
health_res = json.loads(cluster_health_data)
if not health_res or not health_res.get('status'):
raise ValueError()
except ValueError:
error_msgs.append(
'Could not retrieve cluster health status from logging ES pod "{pod}".\n'
'Response was:\n{output}'.format(pod=pod_name, output=cluster_health_data)
)
continue

if health_res['status'] not in ['green', 'yellow']:
error_msgs.append(
'Elasticsearch cluster health status is RED according to pod "{}"'.format(pod_name)
)

return error_msgs

def _check_elasticsearch_diskspace(self, pods_by_name, task_vars):
"""
Exec into an ES pod and query the diskspace on the persistent volume.
Returns: list of errors
"""
error_msgs = []
for pod_name in pods_by_name.keys():
df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name)
disk_output = self._exec_oc(df_cmd, [], task_vars)
lines = disk_output.splitlines()
# expecting one header looking like 'IUse% Use%' and one body line
body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$'
if len(lines) != 2 or len(lines[0].split()) != 2 or not re.match(body_re, lines[1]):
error_msgs.append(
'Could not retrieve storage usage from logging ES pod "{pod}".\n'
'Response to `df` command was:\n{output}'.format(pod=pod_name, output=disk_output)
)
continue
inode_pct, disk_pct = re.match(body_re, lines[1]).groups()

inode_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_inode_pct', default='90')
if int(inode_pct) >= int(inode_pct_thresh):
error_msgs.append(
'Inode percent usage on the storage volume for logging ES pod "{pod}"\n'
' is {pct}, greater than threshold {limit}.\n'
' Note: threshold can be specified in inventory with {param}'.format(
pod=pod_name,
pct=str(inode_pct),
limit=str(inode_pct_thresh),
param='openshift_check_efk_es_inode_pct',
))
disk_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_storage_pct', default='80')
if int(disk_pct) >= int(disk_pct_thresh):
error_msgs.append(
'Disk percent usage on the storage volume for logging ES pod "{pod}"\n'
' is {pct}, greater than threshold {limit}.\n'
' Note: threshold can be specified in inventory with {param}'.format(
pod=pod_name,
pct=str(disk_pct),
limit=str(disk_pct_thresh),
param='openshift_check_efk_es_storage_pct',
))

return error_msgs

def _exec_oc(self, cmd_str, extra_args, task_vars):
return super(Elasticsearch, self).exec_oc(
self.execute_module,
self.logging_namespace,
cmd_str,
extra_args,
task_vars,
)
Loading

0 comments on commit 2e53dbb

Please sign in to comment.