Skip to content

Commit

Permalink
Added new heuristic DistributedCacheLimit heuristic. (linkedin#187)
Browse files Browse the repository at this point in the history
Jobs which put large files(> 500MB) in the distributed cache are flagged.
Files as part of the following are considered.
  mapreduce.job.cache.files
  mapreduce.job.cache.archives
  • Loading branch information
rajagopr authored and akshayrai committed Feb 6, 2017
1 parent 29844c9 commit 0fc5ac2
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 0 deletions.
10 changes: 10 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@
<viewname>views.html.help.mapreduce.helpException</viewname>
</heuristic>

<heuristic>
<applicationtype>mapreduce</applicationtype>
<heuristicname>Distributed Cache Limit</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.DistributedCacheLimitHeuristic</classname>
<viewname>views.html.help.mapreduce.helpDistributedCacheLimit</viewname>
<params>
<distributed.cache.file.size.limit.bytes>500000000</distributed.cache.file.size.limit.bytes>
</params>
</heuristic>


<!-- SPARK HEURISTICS -->

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.linkedin.drelephant.mapreduce.heuristics;

import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


/**
* Rule flags jobs which put files more than 500MB in the distributed cache.
*/
public class DistributedCacheLimitHeuristic implements Heuristic<MapReduceApplicationData> {
private static final Logger logger = Logger.getLogger(DistributedCacheLimitHeuristic.class);
private static final String DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF = "distributed.cache.file.size.limit";
private static final String MAPREDUCE_JOB_CACHE_FILES_FILESIZES = "mapreduce.job.cache.files.filesizes";
private static final String MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES = "mapreduce.job.cache.archives.filesizes";
private static final String MAPREDUCE_JOB_CACHE_FILES = "mapreduce.job.cache.files";
private static final String MAPREDUCE_JOB_CACHE_ARCHIVES = "mapreduce.job.cache.archives";
private static long distributedCacheFileSizeLimit = 500 * FileUtils.ONE_MB; // 500MB default
private HeuristicConfigurationData _heuristicConfData;

public DistributedCacheLimitHeuristic(HeuristicConfigurationData heuristicConfData) {
this._heuristicConfData = heuristicConfData;
loadParameters();
}

private void loadParameters() {
Map<String, String> paramMap = _heuristicConfData.getParamMap();
String heuristicName = _heuristicConfData.getHeuristicName();

String cacheLimit = paramMap.get(DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF);
if (cacheLimit != null) {
try {
distributedCacheFileSizeLimit = Long.parseLong(cacheLimit);
logger.info(
heuristicName + " will use " + DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF + " with the following setting: "
+ distributedCacheFileSizeLimit);
} catch (NumberFormatException e) {
logger
.warn("Error parsing " + DISTRIBUTED_CACHE_FILE_SIZE_LIMIT_CONF + " from the conf file. Check for typos...",
e);
}
}
}

@Override
public HeuristicResult apply(MapReduceApplicationData data) {
if (data == null || !data.getSucceeded()) {
return null;
}

Properties jobConf = data.getConf();
String cacheFiles = jobConf.getProperty(MAPREDUCE_JOB_CACHE_FILES, null);
String cacheFileSizes = jobConf.getProperty(MAPREDUCE_JOB_CACHE_FILES_FILESIZES, null);

HeuristicResult result = null;

if (cacheFiles != null && cacheFileSizes != null) {
result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), Severity.NONE,
0);
List<String> cacheFilesList = new ArrayList<String>(Arrays.asList(cacheFiles.split(",")));
List<String> cacheFileSizesList = new ArrayList<String>(Arrays.asList(cacheFileSizes.split(",")));

int cacheFilesCount = cacheFilesList.size();
int cacheFileSizesCount = cacheFileSizesList.size();

if (cacheFilesCount != cacheFileSizesCount) {
result.setSeverity(Severity.MODERATE);
logger.warn("Mismatch in the number of files and their corresponding sizes for " + MAPREDUCE_JOB_CACHE_FILES);
result.addResultDetail(MAPREDUCE_JOB_CACHE_FILES, Integer.toString(cacheFilesCount));
result.addResultDetail(MAPREDUCE_JOB_CACHE_FILES_FILESIZES, Integer.toString(cacheFileSizesCount));
return result;
}

Map<String, String> cacheFileToSizeMap = new HashMap<String, String>();
for (int i = 0; i < cacheFilesCount; i++) {
cacheFileToSizeMap.put(cacheFilesList.get(i), cacheFileSizesList.get(i));
}

if (checkFileSizeLimit(result, cacheFileToSizeMap)) {
result.setSeverity(Severity.CRITICAL);
}
}

String archiveCacheFiles = jobConf.getProperty(MAPREDUCE_JOB_CACHE_ARCHIVES, null);
String archiveCacheFileSizes = jobConf.getProperty(MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES, null);

if (archiveCacheFiles != null && archiveCacheFileSizes != null) {

if (result == null) {
result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), Severity.NONE,
0);
}

List<String> archiveCacheFilesList = new ArrayList<String>(Arrays.asList(archiveCacheFiles.split(",")));
List<String> archiveCacheFileSizesList = new ArrayList<String>(Arrays.asList(archiveCacheFileSizes.split(",")));

int archiveCacheFilesCount = archiveCacheFilesList.size();
int archiveCacheFileSizesCount = archiveCacheFileSizesList.size();

if (archiveCacheFilesCount != archiveCacheFileSizesCount) {
result.setSeverity(Severity.MODERATE);
logger
.warn("Mismatch in the number of files and their corresponding sizes for " + MAPREDUCE_JOB_CACHE_ARCHIVES);
result.addResultDetail(MAPREDUCE_JOB_CACHE_ARCHIVES, Integer.toString(archiveCacheFilesCount));
result.addResultDetail(MAPREDUCE_JOB_CACHE_ARCHIVES_FILESIZES, Integer.toString(archiveCacheFileSizesCount));
return result;
}

Map<String, String> archiveCacheFileToSizeMap = new HashMap<String, String>();
for (int i = 0; i < archiveCacheFilesCount; i++) {
archiveCacheFileToSizeMap.put(archiveCacheFilesList.get(i), archiveCacheFileSizesList.get(i));
}

if (checkFileSizeLimit(result, archiveCacheFileToSizeMap)) {
result.setSeverity(Severity.CRITICAL);
}
}

return result;
}

private boolean checkFileSizeLimit(HeuristicResult result, Map<String, String> cacheFileToSizeMap) {
boolean limitViolated = false;
for (String file : cacheFileToSizeMap.keySet()) {
long size = 0;
try {
size = Long.parseLong(cacheFileToSizeMap.get(file));
} catch (NumberFormatException e) {
logger.warn("Unable to parse file size value: " + size + " for file: " + file);
}

if (size > distributedCacheFileSizeLimit) {
limitViolated = true;
result.addResultDetail(file, Long.toString(size));
}
}
return limitViolated;
}

@Override
public HeuristicConfigurationData getHeuristicConfData() {
return _heuristicConfData;
}
}
27 changes: 27 additions & 0 deletions app/views/help/mapreduce/helpDistributedCacheLimit.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
@*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@

<p>Jobs which put large files(> 500MB) in the distributed cache are flagged.</p>
<p>Files as part of the following are considered.</p>
<ul>
<li>
mapreduce.job.cache.files
</li>
<li>
mapreduce.job.cache.archives
</li>
</ul>
</p>
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.linkedin.drelephant.mapreduce.heuristics;

import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;


/**
* Tests for the <code>DistributedCacheLimitHeuristic</code> class.
*/
public class DistributedCacheLimitHeuristicTest {
private static Map<String, String> paramMap = new HashMap<String, String>();
private static Properties jobConf = new Properties();
private static final String cacheFileList =
"/path/to/firstCacheFile,/path/to/secondCacheFile,/path/to/thirdCacheFile";
private static final String archiveCacheFileList =
"/path/to/firstArchiveCacheFile,/path/to/secondArchiveCacheFile,/path/to/thirdArchiveCacheFile";

private static Heuristic<MapReduceApplicationData> _heuristic = new DistributedCacheLimitHeuristic(
new HeuristicConfigurationData("test.heuristic", "test.class", "test.view", new ApplicationType("mapreduce"),
paramMap));

@Before
public void setup() {
paramMap.put("distributed.cache.file.size.limit", "500000000");
jobConf.setProperty("mapreduce.job.cache.files", cacheFileList);
jobConf.setProperty("mapreduce.job.cache.archives", archiveCacheFileList);
}

/**
* All cache file sizes are within the limit.
*/
@Test
public void testHeuristicResult() {
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");

MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
}

/**
* File size not found for all the files in cache.
*/
@Test
public void testHeuristicResultCacheFilesAndSizeLengthMismatch() {
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200");
MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.MODERATE);
}

/**
* File size not found for all the files in archive cache.
*/
@Test
public void testHeuristicResultArchiveCacheFilesAndSizeLengthMismatch() {
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500");
MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.MODERATE);
}

/**
* File size limit exceeded for file in cache.
*/
@Test
public void testHeuristicResultCacheFileLimitViolated() {
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,600000000");
jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");

MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.CRITICAL);
}

/**
* File size limit exceeded for file in archive cache.
*/
@Test
public void testHeuristicResultArchiveCacheFileLimitViolated() {
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600000000");

MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.CRITICAL);
}

/**
* Either of the caches are not used by the application.
*/
@Test
public void testHeuristicResultNoDistributedCacheFiles() {
jobConf.remove("mapreduce.job.cache.files");
jobConf.remove("mapreduce.job.cache.archives");
MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result == null);
}

/**
* Cache files are not used by the application.
*/
@Test
public void testHeuristicResultWithEmptyCacheFiles() {
jobConf.remove("mapreduce.job.cache.files");
jobConf.setProperty("mapreduce.job.cache.archives.filesizes", "400,500,600");
MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
}

/**
* Archive cache not used by the application.
*/
@Test
public void testHeuristicResultWithEmptyArchiveCacheFiles() {
jobConf.remove("mapreduce.job.cache.archives");
jobConf.setProperty("mapreduce.job.cache.files.filesizes", "100,200,300");
MapReduceApplicationData data = new MapReduceApplicationData().setJobConf(jobConf);
HeuristicResult result = _heuristic.apply(data);
assertTrue("Failed to match on expected severity", result.getSeverity() == Severity.NONE);
}
}

0 comments on commit 0fc5ac2

Please sign in to comment.