Skip to content
This repository has been archived by the owner on Sep 23, 2019. It is now read-only.

Commit

Permalink
SAMZA-850 : Yarn Job Validation Tool
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyuiscool authored and Navina Ramesh committed Mar 29, 2016
1 parent 9d6831b commit baf9faa
Show file tree
Hide file tree
Showing 13 changed files with 797 additions and 25 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
<subpackage name="metrics">
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.util" />
<allow pkg="org.apache.samza.container" />
</subpackage>

<subpackage name="task">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

import java.util.Map;


/**
* A MetricsAccessor allows users to retrieve metric values, based on group name and metric name,
* though specific metrics system, such as JMX.
*/
public interface MetricsAccessor {
/**
* Get the values of a counter
* @param group Group for the counter, e.g. org.apache.samza.container.SamzaContainerMetrics
* @param counter Name of the counter, e.g. commit-calls
* @return A map of counter values, keyed by type, e.g. {"samza-container-0": 100L}
*/
Map<String, Long> getCounterValues(String group, String counter);

/**
* Get the values of a gauge
* @param group Group for the gauge, e.g. org.apache.samza.container.SamzaContainerMetrics
* @param gauge Name of the gauge, e.g. event-loop-utilization
* @param <T> Type of the gauge value, e.g. Double
* @return A map of gauge values, keyed by type, e.g. {"samza-container-0": 0.8}
*/
<T> Map<String, T> getGaugeValues(String group, String gauge);

/**
* Get the values of a timer
* @param group Group for the timer, e.g. org.apache.samza.container.SamzaContainerMetrics
* @param timer Name of the timer, e.g. choose-ns
* @return A map of timer values, keyed by type, e.g. {"samza-container-0": 10.5}
*/
Map<String, Double> getTimerValues(String group, String timer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

/**
* Thrown when the metrics validation fails. See {@link org.apache.samza.metrics.MetricsValidator}.
*/
public class MetricsValidationFailureException extends Exception {
public MetricsValidationFailureException(String message) {
super(message);
}
public MetricsValidationFailureException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

import org.apache.samza.config.Config;


/**
* A MetricsValidator reads the job's metrics values by using the {@link org.apache.samza.metrics.MetricsAccessor},
* and validate them.
*/
public interface MetricsValidator {
/**
* Initialize with config.
* @param config Job config
*/
void init(Config config);

/**
* Validate the metrics values of a job
* @param accessor Accessor to get the metrics values through specific metrics system, e.g. JMX.
* @throws MetricsValidationFailureException Exception when the validation fails.
*/
void validate(MetricsAccessor accessor) throws MetricsValidationFailureException;

/**
* Complete validation. Final checks can be performed here.
* @throws MetricsValidationFailureException Exception when the validation fails.
*/
void complete() throws MetricsValidationFailureException;
}
14 changes: 14 additions & 0 deletions samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ public String getContainerToHostValue(Integer containerId, String key) {
return mappings.get(key);
}

public Map<Integer, String> getAllContainerToHostValues(String key) {
if (localityManager == null) {
return Collections.EMPTY_MAP;
}
Map<Integer, String> allValues = new HashMap<>();
for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
String value = entry.getValue().get(key);
if (value != null) {
allValues.put(entry.getKey(), value);
}
}
return allValues;
}

private void populateContainerLocalityMappings() {
Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
for (Integer containerId: containers.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* JMX metrics accessor.
* It connects to a container JMX url,and get metrics values by querying the MBeans.
*/
public class JmxMetricsAccessor implements MetricsAccessor {
private static final Logger log = LoggerFactory.getLogger(JmxMetricsAccessor.class);

private final String url;
private JMXConnector jmxc;

public JmxMetricsAccessor(String url) {
this.url = url;
}

public void connect() throws IOException {
JMXServiceURL jmxUrl = new JMXServiceURL(url);
jmxc = JMXConnectorFactory.connect(jmxUrl, null);
}

public void close() throws IOException {
jmxc.close();
}

private <T> Map<String, T> getMetricValues(String group, String metric, String attribute) {
try {
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append(JmxUtil.makeNameJmxSafe(group));
nameBuilder.append(":type=*,name=");
nameBuilder.append(JmxUtil.makeNameJmxSafe(metric));
ObjectName query = new ObjectName(nameBuilder.toString());
Map<String, T> values = new HashMap<>();
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
for (ObjectName objName : conn.queryNames(query, null)) {
String type = objName.getKeyProperty("type");
T val = (T) conn.getAttribute(objName, attribute);
values.put(type, val);
}
return values;
} catch (Exception e) {
log.error(e.getMessage(), e);
return Collections.EMPTY_MAP;
}
}

@Override
public Map<String, Long> getCounterValues(String group, String counter) {
return getMetricValues(group, counter, "Count");
}

@Override
public <T> Map<String, T> getGaugeValues(String group, String gauge) {
return getMetricValues(group, gauge, "Value");
}

@Override
public Map<String, Double> getTimerValues(String group, String timer) {
return getMetricValues(group, timer, "AverageTime");
}
}
59 changes: 59 additions & 0 deletions samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class to create JMX related objects
*/
public class JmxUtil {
private static final Logger log = LoggerFactory.getLogger(JmxUtil.class);

public static ObjectName getObjectName(String group, String name, String t) throws MalformedObjectNameException {
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append(makeNameJmxSafe(group));
nameBuilder.append(":type=");
nameBuilder.append(makeNameJmxSafe(t));
nameBuilder.append(",name=");
nameBuilder.append(makeNameJmxSafe(name));
ObjectName objName = new ObjectName(nameBuilder.toString());
log.debug("Resolved name for " + group + ", " + name + ", " + t + " to: " + objName);
return objName;
}

/*
* JMX only has ObjectName.quote, which is pretty nasty looking. This
* function escapes without quoting, using the rules outlined in:
* http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
*/
public static String makeNameJmxSafe(String str) {
return str
.replace(",", "_")
.replace("=", "_")
.replace(":", "_")
.replace("\"", "_")
.replace("*", "_")
.replace("?", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.samza.metrics.ReadableMetricsRegistry
import org.apache.samza.metrics.ReadableMetricsRegistryListener
import scala.collection.JavaConversions._
import org.apache.samza.metrics.MetricsVisitor
import org.apache.samza.metrics.JmxUtil._

class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
var sources = Map[ReadableMetricsRegistry, String]()
Expand Down Expand Up @@ -84,31 +85,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
}
}

def getObjectName(group: String, name: String, t: String) = {
val nameBuilder = new StringBuilder
nameBuilder.append(makeNameJmxSafe(group))
nameBuilder.append(":type=")
nameBuilder.append(makeNameJmxSafe(t))
nameBuilder.append(",name=")
nameBuilder.append(makeNameJmxSafe(name))
val objName = new ObjectName(nameBuilder.toString)
debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName))
objName
}

/*
* JMX only has ObjectName.quote, which is pretty nasty looking. This
* function escapes without quoting, using the rules outlined in:
* http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
*/
def makeNameJmxSafe(str: String) = str
.replace(",", "_")
.replace("=", "_")
.replace(":", "_")
.replace("\"", "_")
.replace("*", "_")
.replace("?", "_")

def registerBean(bean: MetricMBean) {
if (!server.isRegistered(bean.objectName)) {
debug("Registering MBean for %s." format bean.objectName)
Expand Down
Loading

0 comments on commit baf9faa

Please sign in to comment.