Skip to content

Commit

Permalink
Revert "AMBARI-23258. Ambari-agent logs are messy/hard to read (aonis…
Browse files Browse the repository at this point in the history
…huk)"

This reverts commit 668e397.
  • Loading branch information
aonishuk committed Mar 16, 2018
1 parent 668e397 commit 82f1925
Show file tree
Hide file tree
Showing 21 changed files with 35 additions and 158 deletions.
17 changes: 1 addition & 16 deletions ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def run(self):
changed_alerts = self.get_changed_alerts(alerts)

if changed_alerts and self.initializer_module.is_registered:
self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
self.initializer_module.connection.send(message=changed_alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT)
self.save_results(changed_alerts)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
Expand Down Expand Up @@ -93,18 +93,3 @@ def get_changed_alerts(self, alerts):
changed_alerts.append(alert)

return changed_alerts

@staticmethod
def log_sending(message_dict):
"""
Returned dictionary will be used while logging sent alert status.
Used because full dict is too big for logs and should be shortened
"""
try:
for alert_status in message_dict:
if 'text' in alert_status:
alert_status['text'] = '...'
except KeyError:
pass

return message_dict
42 changes: 12 additions & 30 deletions ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
limitations under the License.
'''

import os
import logging
import threading
import copy
Expand Down Expand Up @@ -77,7 +76,7 @@ def force_update_to_server(self, reports_dict):
return False

try:
self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT, log_message_function=CommandStatusDict.log_sending)
self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
return True
except ConnectionIsAlreadyClosed:
return False
Expand Down Expand Up @@ -132,18 +131,17 @@ def generate_in_progress_report(self, command, report):
and populates other fields of report.
"""
from ActionQueue import ActionQueue

files_to_read = [report['tmpout'], report['tmperr'], report['structuredOut']]
files_content = ['...', '...', '{}']

for i in xrange(len(files_to_read)):
filename = files_to_read[i]
if os.path.exists(filename):
with open(filename, 'r') as fp:
files_content[i] = fp.read()

tmpout, tmperr, tmpstructuredout = files_content

try:
tmpout = open(report['tmpout'], 'r').read()
tmperr = open(report['tmperr'], 'r').read()
except Exception, err:
logger.warn(err)
tmpout = '...'
tmperr = '...'
try:
tmpstructuredout = open(report['structuredOut'], 'r').read()
except Exception:
tmpstructuredout = '{}'
grep = Grep()
output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
inprogress = self.generate_report_template(command)
Expand Down Expand Up @@ -171,21 +169,5 @@ def generate_report_template(self, command):
'roleCommand': command['roleCommand']
}
return stub

@staticmethod
def log_sending(message_dict):
"""
Returned dictionary will be used while logging sent component status.
Used because full dict is too big for logs and should be shortened
"""
try:
for cluster_id in message_dict['clusters']:
for command_status in message_dict['clusters'][cluster_id]:
if 'stdout' in command_status:
command_status['stdout'] = '...'
except KeyError:
pass

return message_dict


Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def getConfigTypeCredentials(self, commandJson):
config.pop(value_name, None)
return configtype_credentials

def generateJceks(self, commandJson):
def qJceks(self, commandJson):
"""
Generates the JCEKS file with passwords for the service specified in commandJson
Expand Down Expand Up @@ -363,7 +363,7 @@ def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name
# generate the JCEKS file for the configurations.
credentialStoreEnabled = False
if 'serviceLevelParams' in command and 'credentialStoreEnabled' in command['serviceLevelParams']:
credentialStoreEnabled = command['serviceLevelParams']['credentialStoreEnabled']
credentialStoreEnabled = (command['serviceLevelParams']['credentialStoreEnabled'] == "true")

if credentialStoreEnabled == True:
if 'commandBeingRetried' not in command['agentLevelParams'] or command['agentLevelParams']['commandBeingRetried'] != "true":
Expand Down
16 changes: 4 additions & 12 deletions ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ def run(self):
logger.debug("Heartbeat response is {0}".format(response))
self.handle_heartbeat_reponse(response)
except Exception as ex:
if isinstance(ex, (ConnectionIsAlreadyClosed)):
logger.info("Connection was closed. Re-running the registration")
elif isinstance(ex, (socket_error)):
logger.info("Connection error \"{0}\". Re-running the registration".format(str(ex)))
else:
if not isinstance(ex, (socket_error, ConnectionIsAlreadyClosed)):
logger.exception("Exception in HeartbeatThread. Re-running the registration")

self.unregister()
Expand Down Expand Up @@ -129,7 +125,7 @@ def register(self):

for endpoint, cache, listener in self.post_registration_requests:
# should not hang forever on these requests
response = self.blocking_request({'hash': cache.hash}, endpoint, log_handler=listener.get_log_message)
response = self.blocking_request({'hash': cache.hash}, endpoint)
try:
listener.on_event({}, response)
except:
Expand Down Expand Up @@ -231,16 +227,12 @@ def subscribe_to_topics(self, topics_list):
for topic_name in topics_list:
self.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')

def blocking_request(self, message, destination, log_handler=None, timeout=REQUEST_RESPONSE_TIMEOUT):
def blocking_request(self, message, destination, timeout=REQUEST_RESPONSE_TIMEOUT):
"""
Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
"""
def presend_hook(correlation_id):
if log_handler:
self.server_responses_listener.logging_handlers[str(correlation_id)] = log_handler

try:
correlation_id = self.connection.send(message=message, destination=destination, presend_hook=presend_hook)
correlation_id = self.connection.send(message=message, destination=destination)
except ConnectionIsAlreadyClosed:
# this happens when trying to connect to broken connection. Happens if ambari-server is restarted.
logger.warn("Connection failed while trying to connect to {0}".format(destination))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from resource_management.libraries.functions.get_port_from_url import get_port_from_url
from ambari_commons import inet_utils

logger = logging.getLogger(__name__)
logger = logging.getLogger()

AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _collect(self):
matchObj = re.match( r'((.*)services(.*)package)', self.path_to_script)
if matchObj:
basedir = matchObj.group(1)
with Environment(basedir, tmp_dir=AGENT_TMP_DIR, logger=logging.getLogger('alerts')) as env:
with Environment(basedir, tmp_dir=AGENT_TMP_DIR, logger=logging.getLogger('ambari_alerts')) as env:
result = cmd_module.execute(configurations, self.parameters, self.host_name)
else:
result = cmd_module.execute(configurations, self.parameters, self.host_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,3 @@ def on_event(self, headers, message):

def get_handled_path(self):
return Constants.ALERTS_DEFINITIONS_TOPIC

def get_log_message(self, headers, message_json):
"""
This string will be used to log received messsage of this type.
Usually should be used if full dict is too big for logs and should shortened or made more readable
"""
try:
for cluster_id in message_json['clusters']:
for alert_definition in message_json['clusters'][cluster_id]['alertDefinitions']:
if 'source' in alert_definition:
alert_definition['source'] = '...'
except KeyError:
pass

return super(AlertDefinitionsEventListener, self).get_log_message(headers, message_json)
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,4 @@ def on_event(self, headers, message):
self.action_queue.put(commands)

def get_handled_path(self):
return Constants.COMMANDS_TOPIC

def get_log_message(self, headers, message_json):
"""
This string will be used to log received messsage of this type.
Usually should be used if full dict is too big for logs and should shortened or made more readable
"""
try:
for cluster_id in message_json['clusters']:
for command in message_json['clusters'][cluster_id]['commands']:
if 'repositoryFile' in command:
command['repositoryFile'] = '...'
if 'commandParams' in command:
command['commandParams'] = '...'
except KeyError:
pass

return super(CommandsEventListener, self).get_log_message(headers, message_json)
return Constants.COMMANDS_TOPIC
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,4 @@ def on_event(self, headers, message):
self.configuration_cache.rewrite_cache(message['clusters'], message['hash'])

def get_handled_path(self):
return Constants.CONFIGURATIONS_TOPIC

def get_log_message(self, headers, message_json):
"""
This string will be used to log received messsage of this type.
Usually should be used if full dict is too big for logs and should shortened shortened or made more readable
"""
try:
for cluster_id in message_json['clusters']:
for config_type in message_json['clusters'][cluster_id]['configurations']:
message_json['clusters'][cluster_id]['configurations'][config_type] = '...'
except KeyError:
pass

return super(ConfigurationEventListener, self).get_log_message(headers, message_json)
return Constants.CONFIGURATIONS_TOPIC
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class ServerResponsesListener(EventListener):
"""
def __init__(self):
self.listener_functions = {}
self.logging_handlers = {}
self.reset_responses()

def on_event(self, headers, message):
Expand Down Expand Up @@ -64,13 +63,6 @@ def get_log_message(self, headers, message_json):
"""
if Constants.CORRELATION_ID_STRING in headers:
correlation_id = headers[Constants.CORRELATION_ID_STRING]

if correlation_id in self.logging_handlers:
message_json = self.logging_handlers[correlation_id](headers, message_json)
if message_json.startswith(" :"):
message_json = message_json[2:]
del self.logging_handlers[correlation_id]

return " (correlation_id={0}): {1}".format(correlation_id, message_json)
return str(message_json)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,4 @@ def on_event(self, headers, message):
logger.error("Unknown event type '{0}' for topology event")

def get_handled_path(self):
return Constants.TOPOLOGIES_TOPIC

def get_log_message(self, headers, message_json):
"""
This string will be used to log received messsage of this type.
Usually should be used if full dict is too big for logs and should shortened or made more readable
"""
try:
for cluster_id in message_json['clusters']:
for component_info in message_json['clusters'][cluster_id]['components']:
if 'componentLevelParams' in component_info:
component_info['componentLevelParams'] = '...'
if 'commandParams' in component_info:
component_info['commandParams'] = '...'
except KeyError:
pass

return super(TopologyEventListener, self).get_log_message(headers, message_json)
return Constants.TOPOLOGIES_TOPIC
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import ambari_simplejson as json
import ambari_stomp
import logging
import copy

logger = logging.getLogger(__name__)

Expand All @@ -45,8 +44,7 @@ def on_message(self, headers, message):
except ValueError:
logger.exception("Received from server event is not a valid message json. Message is:\n{0}".format(message))
return

logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json))))
logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json)))
try:
self.on_event(headers, message_json)
except:
Expand Down
4 changes: 0 additions & 4 deletions ambari-agent/src/main/python/ambari_agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ def mp_locked_init(self, *a, **kw):
from HeartbeatHandlers import bind_signal_handlers
from ambari_commons.constants import AMBARI_SUDO_BINARY
from resource_management.core.logger import Logger
#from resource_management.core.resources.system import File
#from resource_management.core.environment import Environment

from ambari_agent import HeartbeatThread
from ambari_agent.InitializerModule import InitializerModule
Expand Down Expand Up @@ -418,8 +416,6 @@ def main(initializer_module, heartbeat_stop_callback=None):
setup_logging(apscheduler_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
setup_logging(apscheduler_logger_global, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
Logger.initialize_logger('resource_management', logging_level=logging_level)
#with Environment() as env:
# File("/abc")

# init data, once loggers are setup to see exceptions/errors of initialization.
initializer_module.init()
Expand Down
9 changes: 2 additions & 7 deletions ambari-agent/src/main/python/ambari_agent/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import httplib
import urllib2
import socket
import copy
import ssl
import os
import logging
Expand Down Expand Up @@ -106,16 +105,12 @@ def __init__(self, *args, **kwargs):
self.correlation_id = -1
WsConnection.__init__(self, *args, **kwargs)

def send(self, destination, message, content_type=None, headers=None, log_message_function=lambda x:x, presend_hook=None, **keyword_headers):
def send(self, destination, message, content_type=None, headers=None, **keyword_headers):
with self.lock:
self.correlation_id += 1
correlation_id = self.correlation_id

if presend_hook:
presend_hook(self.correlation_id)

logged_message = log_message_function(copy.deepcopy(message))
logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, correlation_id, logged_message))
logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, correlation_id, message))

body = json.dumps(message)
WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=correlation_id, **keyword_headers)
Expand Down
4 changes: 2 additions & 2 deletions ambari-common/src/main/python/ambari_stomp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def process_frame(self, f, frame_str):
if log.isEnabledFor(logging.DEBUG):
log.debug("Received frame: %r, headers=%r, body=%r", f.cmd, f.headers, f.body)
else:
log.debug("Received frame: %r, headers=%r, len(body)=%r", f.cmd, f.headers, utils.length(f.body))
log.info("Received frame: %r, headers=%r, len(body)=%r", f.cmd, f.headers, utils.length(f.body))
self.notify(frame_type, f.headers, f.body)
else:
log.warning("Unknown response frame type: '%s' (frame length was %d)", frame_type, utils.length(frame_str))
Expand Down Expand Up @@ -268,7 +268,7 @@ def transmit(self, frame):
if log.isEnabledFor(logging.DEBUG):
log.debug("Sending frame: %s", lines)
else:
log.debug("Sending frame: %r, headers=%r", frame.cmd or "heartbeat", frame.headers)
log.info("Sending frame: %r, headers=%r", frame.cmd or "heartbeat", frame.headers)

self.send(encode(packed_frame))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public ExecutionCommand() {
* the deprecated use of {@link KeyNames#REPO_INFO}
*/
@SerializedName("repositoryFile")
@JsonIgnore
private CommandRepository commandRepository;

@SerializedName("componentVersionMap")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public List<AlertDefinitionEntity> findByService(long clusterId,

/**
* Gets all alert definitions for the specified services that do not have a
* component and do not belong to AGGREGATE source type. These definitions are assumed to be run on the master hosts.
* component. These definitions are assumed to be run on the master hosts.
*
* @param clusterId
* the ID of the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@
}),
@NamedQuery(name = "AlertDefinitionEntity.findByService", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.serviceName = :serviceName AND ad.clusterId = :clusterId"),
@NamedQuery(name = "AlertDefinitionEntity.findByServiceAndComponent", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.serviceName = :serviceName AND ad.componentName = :componentName AND ad.clusterId = :clusterId"),
@NamedQuery(name = "AlertDefinitionEntity.findByServiceMaster", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.serviceName IN :services AND ad.scope = :scope AND ad.clusterId = :clusterId AND ad.componentName IS NULL" +
" AND ad.sourceType <> org.apache.ambari.server.state.alert.SourceType.AGGREGATE"),
@NamedQuery(name = "AlertDefinitionEntity.findByServiceMaster", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.serviceName IN :services AND ad.scope = :scope AND ad.clusterId = :clusterId AND ad.componentName IS NULL"),
@NamedQuery(name = "AlertDefinitionEntity.findByIds", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.definitionId IN :definitionIds"),
@NamedQuery(name = "AlertDefinitionEntity.findBySourceType", query = "SELECT ad FROM AlertDefinitionEntity ad WHERE ad.clusterId = :clusterId AND ad.sourceType = :sourceType")})
public class AlertDefinitionEntity {
Expand Down
Loading

0 comments on commit 82f1925

Please sign in to comment.