From 0fc5ac208cda86ac71f893ad2b52b381d3b9c266 Mon Sep 17 00:00:00 2001
From: Ragesh Rajagopalan
Date: Mon, 6 Feb 2017 17:44:47 +0530
Subject: [PATCH] Added new heuristic DistributedCacheLimit heuristic. (#187)
Jobs which put large files(> 500MB) in the distributed cache are flagged.
Files as part of the following are considered.
mapreduce.job.cache.files
mapreduce.job.cache.archives
---
app-conf/HeuristicConf.xml | 10 ++
.../DistributedCacheLimitHeuristic.java | 157 ++++++++++++++++++
.../helpDistributedCacheLimit.scala.html | 27 +++
.../DistributedCacheLimitHeuristicTest.java | 137 +++++++++++++++
4 files changed, 331 insertions(+)
create mode 100644 app/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristic.java
create mode 100644 app/views/help/mapreduce/helpDistributedCacheLimit.scala.html
create mode 100644 test/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristicTest.java
diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml
index cb2345cd7..21a00168a 100644
--- a/app-conf/HeuristicConf.xml
+++ b/app-conf/HeuristicConf.xml
@@ -156,6 +156,16 @@
views.html.help.mapreduce.helpException
+
+ mapreduce
+ Distributed Cache Limit
+ com.linkedin.drelephant.mapreduce.heuristics.DistributedCacheLimitHeuristic
+ views.html.help.mapreduce.helpDistributedCacheLimit
+
+ 500000000
+
+
+
diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristic.java
new file mode 100644
index 000000000..671e321ad
--- /dev/null
+++ b/app/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristic.java
@@ -0,0 +1,157 @@
+package com.linkedin.drelephant.mapreduce.heuristics;
+
+import com.linkedin.drelephant.analysis.Heuristic;
+import com.linkedin.drelephant.analysis.HeuristicResult;
+import com.linkedin.drelephant.analysis.Severity;
+import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
+import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Rule flags jobs which put files more than 500MB in the distributed cache.
+ */
+public class DistributedCacheLimitHeuristic implements Heuristic {
+ private static final Logger logger = Logger.getLogger(DistributedCacheLimitHeuristic.class);
+ private static final String DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF = "distributed.cache.file.size.limit";
+ private static final String MAPREDUCE_JOB_CACHE_FILES_FILESIZES = "mapreduce.job.cache.files.filesizes";
+ private static final String MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES = "mapreduce.job.cache.archives.filesizes";
+ private static final String MAPREDUCE_JOB_CACHE_FILES = "mapreduce.job.cache.files";
+ private static final String MAPREDUCE_JOB_CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+ private static long distributedCacheFileSizeLimit = 500 * FileUtils.ONE_MB; // 500MB default
+ private HeuristicConfigurationData _heuristicConfData;
+
+ public DistributedCacheLimitHeuristic(HeuristicConfigurationData heuristicConfData) {
+ this._heuristicConfData = heuristicConfData;
+ loadParameters();
+ }
+
+ private void loadParameters() {
+ Map paramMap = _heuristicConfData.getParamMap();
+ String heuristicName = _heuristicConfData.getHeuristicName();
+
+ String cacheLimit = paramMap.get(DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF);
+ if (cacheLimit != null) {
+ try {
+ distributedCacheFileSizeLimit = Long.parseLong(cacheLimit);
+ logger.info(
+ heuristicName + " will use " + DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF + " with the following setting: "
+ + distributedCacheFileSizeLimit);
+ } catch (NumberFormatException e) {
+ logger
+ .warn("Error parsing " + DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF + " from the conf file. Check for typos...",
+ e);
+ }
+ }
+ }
+
+ @Override
+ public HeuristicResult apply(MapReduceApplicationData data) {
+ if (data == null || !data.getSucceeded()) {
+ return null;
+ }
+
+ Properties jobConf = data.getConf();
+ String cacheFiles = jobConf.getProperty(MAPREDUCE_JOB_CACHE_FILES, null);
+ String cacheFileSizes = jobConf.getProperty(MAPREDUCE_JOB_CACHE_FILES_FILESIZES, null);
+
+ HeuristicResult result = null;
+
+ if (cacheFiles != null && cacheFileSizes != null) {
+ result =
+ new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), Severity.NONE,
+ 0);
+ List cacheFilesList = new ArrayList(Arrays.asList(cacheFiles.split(",")));
+ List cacheFileSizesList = new ArrayList(Arrays.asList(cacheFileSizes.split(",")));
+
+ int cacheFilesCount = cacheFilesList.size();
+ int cacheFileSizesCount = cacheFileSizesList.size();
+
+ if (cacheFilesCount != cacheFileSizesCount) {
+ result.setSeverity(Severity.MODERATE);
+ logger.warn("Mismatch in the number of files and their corresponding sizes for " + MAPREDUCE_JOB_CACHE_FILES);
+ result.addResultDetail(MAPREDUCE_JOB_CACHE_FILES, Integer.toString(cacheFilesCount));
+ result.addResultDetail(MAPREDUCE_JOB_CACHE_FILES_FILESIZES, Integer.toString(cacheFileSizesCount));
+ return result;
+ }
+
+ Map cacheFileToSizeMap = new HashMap();
+ for (int i = 0; i < cacheFilesCount; i++) {
+ cacheFileToSizeMap.put(cacheFilesList.get(i), cacheFileSizesList.get(i));
+ }
+
+ if (checkFileSizeLimit(result, cacheFileToSizeMap)) {
+ result.setSeverity(Severity.CRITICAL);
+ }
+ }
+
+ String archiveCacheFiles = jobConf.getProperty(MAPREDUCE_JOB_CACHE_ARCHIVES, null);
+ String archiveCacheFileSizes = jobConf.getProperty(MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES, null);
+
+ if (archiveCacheFiles != null && archiveCacheFileSizes != null) {
+
+ if (result == null) {
+ result =
+ new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), Severity.NONE,
+ 0);
+ }
+
+ List archiveCacheFilesList = new ArrayList(Arrays.asList(archiveCacheFiles.split(",")));
+ List archiveCacheFileSizesList = new ArrayList(Arrays.asList(archiveCacheFileSizes.split(",")));
+
+ int archiveCacheFilesCount = archiveCacheFilesList.size();
+ int archiveCacheFileSizesCount = archiveCacheFileSizesList.size();
+
+ if (archiveCacheFilesCount != archiveCacheFileSizesCount) {
+ result.setSeverity(Severity.MODERATE);
+ logger
+ .warn("Mismatch in the number of files and their corresponding sizes for " + MAPREDUCE_JOB_CACHE_ARCHIVES);
+ result.addResultDetail(MAPREDUCE_JOB_CACHE_ARCHIVES, Integer.toString(archiveCacheFilesCount));
+ result.addResultDetail(MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES, Integer.toString(archiveCacheFileSizesCount));
+ return result;
+ }
+
+ Map archiveCacheFileToSizeMap = new HashMap();
+ for (int i = 0; i < archiveCacheFilesCount; i++) {
+ archiveCacheFileToSizeMap.put(archiveCacheFilesList.get(i), archiveCacheFileSizesList.get(i));
+ }
+
+ if (checkFileSizeLimit(result, archiveCacheFileToSizeMap)) {
+ result.setSeverity(Severity.CRITICAL);
+ }
+ }
+
+ return result;
+ }
+
+ private boolean checkFileSizeLimit(HeuristicResult result, Map cacheFileToSizeMap) {
+ boolean limitViolated = false;
+ for (String file : cacheFileToSizeMap.keySet()) {
+ long size = 0;
+ try {
+ size = Long.parseLong(cacheFileToSizeMap.get(file));
+ } catch (NumberFormatException e) {
+ logger.warn("Unable to parse file size value: " + size + " for file: " + file);
+ }
+
+ if (size > distributedCacheFileSizeLimit) {
+ limitViolated = true;
+ result.addResultDetail(file, Long.toString(size));
+ }
+ }
+ return limitViolated;
+ }
+
+ @Override
+ public HeuristicConfigurationData getHeuristicConfData() {
+ return _heuristicConfData;
+ }
+}
diff --git a/app/views/help/mapreduce/helpDistributedCacheLimit.scala.html b/app/views/help/mapreduce/helpDistributedCacheLimit.scala.html
new file mode 100644
index 000000000..b2af0b457
--- /dev/null
+++ b/app/views/help/mapreduce/helpDistributedCacheLimit.scala.html
@@ -0,0 +1,27 @@
+@*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the "License"); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*@
+
+Jobs which put large files(> 500MB) in the distributed cache are flagged.
+Files as part of the following are considered.
+
+ -
+ mapreduce.job.cache.files
+
+ -
+ mapreduce.job.cache.archives
+
+
+
\ No newline at end of file
diff --git a/test/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristicTest.java b/test/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristicTest.java
new file mode 100644
index 000000000..b655a68d3
--- /dev/null
+++ b/test/com/linkedin/drelephant/mapreduce/heuristics/DistributedCacheLimitHeuristicTest.java
@@ -0,0 +1,137 @@
+package com.linkedin.drelephant.mapreduce.heuristics;
+
+import com.linkedin.drelephant.analysis.ApplicationType;
+import com.linkedin.drelephant.analysis.Heuristic;
+import com.linkedin.drelephant.analysis.HeuristicResult;
+import com.linkedin.drelephant.analysis.Severity;
+import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
+import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for the DistributedCacheLimitHeuristic
class.
+ */
+public class DistributedCacheLimitHeuristicTest {
+ private static Map paramMap = new HashMap();
+ private static Properties jobConf = new Properties();
+ private static final String cacheFileList =
+ "/path/to/firstCacheFile,/path/to/secondCacheFile,/path/to/thirdCacheFile";
+ private static final String archiveCacheFileList =
+ "/path/to/firstArchiveCacheFile,/path/to/secondArchiveCacheFile,/path/to/thirdArchiveCacheFile";
+
+ private static Heuristic _heuristic = new DistributedCacheLimitHeuristic(
+ new HeuristicConfigurationData("test.heuristic", "test.class", "test.view", new ApplicationType("mapreduce"),
+ paramMap));
+
+ @Before
+ public void setup() {
+ paramMap.put("distributed.cache.file.size.limit", "500000000");
+ jobConf.setProperty("mapreduce.job.cache.files", cacheFileList);
+ jobConf.setProperty("mapreduce.job.cache.archives", archiveCacheFileList);
+ }
+
+ /**
+ * All cache file sizes are within the limit.
+ */
+ @Test
+ public void testHeuristicResult() {
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
+ jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");
+
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
+ }
+
+ /**
+ * File size not found for all the files in cache.
+ */
+ @Test
+ public void testHeuristicResultCacheFilesAndSizeLengthMismatch() {
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200");
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.MODERATE);
+ }
+
+ /**
+ * File size not found for all the files in archive cache.
+ */
+ @Test
+ public void testHeuristicResultArchiveCacheFilesAndSizeLengthMismatch() {
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
+ jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500");
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.MODERATE);
+ }
+
+ /**
+ * File size limit exceeded for file in cache.
+ */
+ @Test
+ public void testHeuristicResultCacheFileLimitViolated() {
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,600000000");
+ jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");
+
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.CRITICAL);
+ }
+
+ /**
+ * File size limit exceeded for file in archive cache.
+ */
+ @Test
+ public void testHeuristicResultArchiveCacheFileLimitViolated() {
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
+ jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600000000");
+
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.CRITICAL);
+ }
+
+ /**
+ * Either of the caches are not used by the application.
+ */
+ @Test
+ public void testHeuristicResultNoDistributedCacheFiles() {
+ jobConf.remove("mapreduce.job.cache.files");
+ jobConf.remove("mapreduce.job.cache.archives");
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result == null);
+ }
+
+ /**
+ * Cache files are not used by the application.
+ */
+ @Test
+ public void testHeuristicResultWithEmptyCacheFiles() {
+ jobConf.remove("mapreduce.job.cache.files");
+ jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
+ }
+
+ /**
+ * Archive cache not used by the application.
+ */
+ @Test
+ public void testHeuristicResultWithEmptyArchiveCacheFiles() {
+ jobConf.remove("mapreduce.job.cache.archives");
+ jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
+ MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
+ HeuristicResult result = _heuristic.apply(data);
+ assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
+ }
+}