Skip to content

Commit

Permalink
Modify UserObjectModel, finish the basic diagnosis
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Apr 10, 2014
1 parent a1c2919 commit 76fafd7
Show file tree
Hide file tree
Showing 18 changed files with 863 additions and 138 deletions.
15 changes: 15 additions & 0 deletions src/dataflow/model/mapper/DiskCombineFunc.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class DiskCombineFunc {

private long tCombineInputRecords = -1;

private long inputRecsInPreviousMerges = -1;

public long getcCombineInputRecords() {
return cCombineInputRecords;
}
Expand All @@ -33,6 +35,15 @@ public long gettCombineInputRecords() {
public void settCombineInputRecords(long tCombineInputRecords) {
this.tCombineInputRecords = tCombineInputRecords;
}

public void setInputRecsInPreviousMerges(long inputRecsInPreviousMerges) {
this.inputRecsInPreviousMerges = inputRecsInPreviousMerges;

}

public long getInputRecsInPreviousMerges() {
return inputRecsInPreviousMerges;
}

public String toString() {
StringBuilder sb = new StringBuilder();
Expand All @@ -43,4 +54,8 @@ public String toString() {

return sb.toString();
}




}
5 changes: 5 additions & 0 deletions src/dataflow/model/mapper/MapFunc.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public class MapFunc implements Serializable {

private long tmapInputBytes; // total "HDFS_BYTES_READ"


public long predictTotalInputRecords() {
return (long) ((double)tmapInputBytes / cmapInputBytes * cmapInputRecords);
}

public String toString() {
StringBuilder sb = new StringBuilder();
DecimalFormat f = new DecimalFormat(",###");
Expand Down
7 changes: 6 additions & 1 deletion src/dataflow/model/mapper/Mapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ public Mapper(Configuration conf) {
this.conf = conf;
split = new InputSplit();
mapFunc = new MapFunc();
spillBuffer = new SpillBuffer(conf);

this.hasReducer = conf.getMapred_reduce_tasks() == 0 ? false : true;
if(hasReducer)
spillBuffer = new SpillBuffer(conf);

if(conf.getMapreduce_combine_class() != null)
memCombineFunc = new MemCombineFunc();
Expand Down Expand Up @@ -135,6 +137,7 @@ public void setSpills(Spill spill, MapperCounters counters) {
if(memCombineFunc != null && info != null) {
memCombineFunc.settCombineInputRecords(info.getRecordsBeforeCombine());

memCombineFunc.setInputRecsInPreviousSpills(inputRecsInPreviousSpills);

memCombineFunc.setcCombineInputRecords(counters.getCombine_input_records() - inputRecsInPreviousSpills);
memCombineFunc.setcCombineOutputRecords(counters.getCombine_output_records() - outputRecsInPreviousSpills);
Expand Down Expand Up @@ -176,6 +179,8 @@ public void setMerges(Merge merge, MapperCounters counters) {
combineOutputRecsInSpills += sp.getRecordsAfter();
}

diskCombineFunc.setInputRecsInPreviousMerges(inputRecsInPreviousMerges);

diskCombineFunc.setcCombineInputRecords(counters.getCombine_input_records()
- combineInputRecsInSpills - inputRecsInPreviousMerges);
diskCombineFunc.setcCombineOutputRecords(counters.getCombine_output_records()
Expand Down
10 changes: 10 additions & 0 deletions src/dataflow/model/mapper/MemCombineFunc.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class MemCombineFunc {

private long tCombineInputRecords = -1;

private long inputRecsInPreviousSpills = -1;

public long getcCombineInputRecords() {
return cCombineInputRecords;
Expand All @@ -35,6 +36,15 @@ public void settCombineInputRecords(long tCombineInputRecords) {
this.tCombineInputRecords = tCombineInputRecords;
}

public void setInputRecsInPreviousSpills(long inputRecsInPreviousSpills) {
this.inputRecsInPreviousSpills = inputRecsInPreviousSpills;

}

public long getInputRecsInPreviousSpills() {
return inputRecsInPreviousSpills;
}

public String toString() {
StringBuilder sb = new StringBuilder();
DecimalFormat f = new DecimalFormat(",###");
Expand Down
26 changes: 23 additions & 3 deletions src/dataflow/model/mapper/SpillBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SpillBuffer {


public SpillBuffer(Configuration conf) {

// float spillper = conf.getIo_sort_spill_percent();
float recper = conf.getIo_sort_record_percent();
this.io_sort_mb = conf.getIo_sort_mb();
Expand All @@ -35,7 +36,6 @@ public SpillBuffer(Configuration conf) {
//each kvoffsets/kvindices is a integer, kvindices has three elements while kvoffsets has only one
this.kvoffsets = recordCapacity;
this.kvindices = recordCapacity * 3;

}

public String toString() {
Expand Down Expand Up @@ -84,7 +84,27 @@ public long getKvoffsetsLen() {

public void setIoSortMB(int io_sort_mb) {
this.io_sort_mb = io_sort_mb;


}

public int getIo_sort_mb() {
return io_sort_mb;
}

public void setIo_sort_mb(int io_sort_mb) {
this.io_sort_mb = io_sort_mb;
}

public long getKvbuffer() {
return kvbuffer;
}

public long getKvindices() {
return kvindices;
}

public long getKvoffsets() {
return kvoffsets;
}


}
10 changes: 10 additions & 0 deletions src/dataflow/model/reducer/MergeCombineFunc.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class MergeCombineFunc {

private long tCombineInputRecords = -1;

private long inputRecsInPreviousMerges = -1;

public long getcCombineInputRecords() {
return cCombineInputRecords;
}
Expand Down Expand Up @@ -45,4 +47,12 @@ public String toString() {
return sb.toString();
}

public void setInputRecsInPreviousMerges(long inputRecsInPreviousMerges) {
this.inputRecsInPreviousMerges = inputRecsInPreviousMerges;
}

public long getInputRecsInPreviousMerges() {
return inputRecsInPreviousMerges;
}

}
20 changes: 19 additions & 1 deletion src/dataflow/model/reducer/Reducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class Reducer {
private SegmentsInShuffle segsInShuffle;

private MergeCombineFunc mergeCombineFunc;
private List<OnDiskSeg> onDiskSegs;
//private List<OnDiskSeg> onDiskSegs;
private List<Segment> segmentsInReduceBuf;
private ReduceFunc reduceFunc;

Expand Down Expand Up @@ -128,6 +128,8 @@ public void setMergeCombineFunc(MergeInShuffle mergeInShuffle, ReducerCounters c
mergeCombineFunc.setcCombineInputRecords(counters.getCombine_input_records() - inputRecsInPreviousMerges);
mergeCombineFunc.setcCombineOutputRecords(counters.getCombine_output_records() - outputRecsInPreviousMerges);

mergeCombineFunc.setInputRecsInPreviousMerges(inputRecsInPreviousMerges);

}
}

Expand Down Expand Up @@ -158,6 +160,22 @@ public void setReduce(ReducerCounters counters) {
public SegmentsInShuffle getSegsInShuffle() {
return segsInShuffle;
}

public String getRunningPhase() {
return runningPhase;
}

public boolean isInMemMergeRunning() {
return isInMemMergeRunning;
}

public MergeCombineFunc getMergeCombineFunc() {
return mergeCombineFunc;
}

public ReduceFunc getReduceFunc() {
return reduceFunc;
}


}
88 changes: 88 additions & 0 deletions src/diagnosis/job/OOMAnalyzer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package diagnosis.job;


import object.model.job.DumpedObjectsAnalyzer;
import object.model.mapper.MapperObject;
import object.model.mapper.MapperObjectModel;
import object.model.reducer.ReducerObject;
import object.model.reducer.ReducerObjectModel;
import dataflow.model.job.Job;
import dataflow.model.job.JobDataflowModelBuilder;
import dataflow.model.mapper.Mapper;
import dataflow.model.reducer.Reducer;
import profile.job.JobProfile;
import profile.profiler.JobProfileFromSerialization;


public class OOMAnalyzer {

public static void main(String[] args) {
String jobId = "job_201404061331_0008";
String oomTaskId = "attempt_201403211644_0007_m_000000_0";

String serializeDir = "/Users/xulijie/Documents/DiagOOMSpace/PigMapJoin/";

// profile obtained from logs, counters, JVM usage and etc.
JobProfile jobProfile = JobProfileFromSerialization.deserialization(serializeDir, jobId);

// dataflow
Job job = JobDataflowModelBuilder.buildDataflow(jobProfile);

Mapper mapper = null;
Reducer reducer = null;

if(oomTaskId.contains("_m_")) {
int mapperId = Integer.parseInt(oomTaskId.substring(oomTaskId.indexOf("_m_") + 3, oomTaskId.lastIndexOf('_')));
mapper = job.getMappers().get(mapperId);
}

else {
int reducerId = Integer.parseInt(oomTaskId.substring(oomTaskId.indexOf("_r_") + 3, oomTaskId.lastIndexOf('_')));
reducer = job.getReducers().get(reducerId);
}

// in-memory framework and user objects
DumpedObjectsAnalyzer analyzer = new DumpedObjectsAnalyzer(serializeDir);
analyzer.parseEachDump();

if(mapper != null)
diagnoseMapper(mapper, analyzer);
else if(reducer != null)
diagnoseReducer(reducer, analyzer);

}


private static void diagnoseMapper(Mapper mapper, DumpedObjectsAnalyzer analyzer) {

// put the dataflow and object information into an integrated class ==> MapperObject
MapperObject mapperObj = new MapperObject(mapper, analyzer.getBufferObjs(),
analyzer.getUserObjsPerDumpList());

MapperObjectModel mapperModel = new MapperObjectModel(mapperObj);
mapperModel.buildModel();
mapperModel.displayReport();

}

private static void diagnoseReducer(Reducer reducer, DumpedObjectsAnalyzer analyzer) {

ReducerObject reducerObj = new ReducerObject(reducer, analyzer.getSegments(),
analyzer.getUserObjsPerDumpList());

ReducerObjectModel reducerModel = new ReducerObjectModel(reducerObj);
reducerModel.buildModel();
reducerModel.displayReport();
}


public void displayDataflow(Job job, int mapperId, int reducerId) {
System.out.println("## Mapper");
System.out.println(job.getMappers().get(mapperId));

System.out.println("\n## Reducer");
System.out.println(job.getReducers().get(reducerId));
}


}
Loading

0 comments on commit 76fafd7

Please sign in to comment.