Skip to content

Commit

Permalink
Profile each task, preliminary dataflow models
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Mar 25, 2014
0 parents commit 5afacdc
Show file tree
Hide file tree
Showing 57 changed files with 3,213 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
<classpathentry kind="lib" path="lib/jsoup-1.6.1.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin
17 changes: 17 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>DiagOOM</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
11 changes: 11 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.7
Binary file added lib/jsoup-1.6.1.jar
Binary file not shown.
10 changes: 10 additions & 0 deletions src/dataflow/model/mapper/DiskCombineFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dataflow.model.mapper;

public class DiskCombineFunc {

private long combineInputRecords;
private long combineInputBytes;

private long combineOutputRecords;
private long combineOutputBytes;
}
14 changes: 14 additions & 0 deletions src/dataflow/model/mapper/InputSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dataflow.model.mapper;

public class InputSplit {

private long splitBytes;

public long getSplitBytes() {
return splitBytes;
}

public void setSplitBytes(long splitBytes) {
this.splitBytes = splitBytes;
}
}
48 changes: 48 additions & 0 deletions src/dataflow/model/mapper/MapFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dataflow.model.mapper;

import java.io.Serializable;

public class MapFunc implements Serializable {


private static final long serialVersionUID = 3972332854774013306L;

private long mapInputRecords; // equals "Map input records"
private long mapInputBytes; // equals "HDFS_BYTES_READ"

private long mapOutputRecords; // equals "Map output records"
private long mapOutputBytes; // equals "Map output bytes"

public long getMapInputRecords() {
return mapInputRecords;
}

public void setMapInputRecords(long mapInputRecords) {
this.mapInputRecords = mapInputRecords;
}

public long getMapInputBytes() {
return mapInputBytes;
}

public void setMapInputBytes(long mapInputBytes) {
this.mapInputBytes = mapInputBytes;
}

public long getMapOutputRecords() {
return mapOutputRecords;
}

public void setMapOutputRecords(long mapOutputRecords) {
this.mapOutputRecords = mapOutputRecords;
}

public long getMapOutputBytes() {
return mapOutputBytes;
}

public void setMapOutputBytes(long mapOutputBytes) {
this.mapOutputBytes = mapOutputBytes;
}

}
125 changes: 125 additions & 0 deletions src/dataflow/model/mapper/Mapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dataflow.model.mapper;

import java.util.List;

public class Mapper {

// e.g., attempt_201403211644_0002_m_000013_0
private String taskId;

private InputSplit split;
private MapFunc mapFunc;
private SpillBuffer spillBuffer;
private MemCombineFunc memCombineFunc;
private List<Spill> spills;
private DiskCombineFunc diskCombineFunc;
private List<Segment> mapOutputs;
private List<Merge> merges;

private long file_bytes_read;
private long file_bytes_written;
private long physical_memory_bytes;
private long total_committed_bytes;

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}

public InputSplit getSplit() {
return split;
}

public void setSplit(InputSplit split) {
this.split = split;
}

public MapFunc getMapFunc() {
return mapFunc;
}

public void setMapFunc(MapFunc mapFunc) {
this.mapFunc = mapFunc;
}

public SpillBuffer getSpillBuffer() {
return spillBuffer;
}

public void setSpillBuffer(SpillBuffer spillBuffer) {
this.spillBuffer = spillBuffer;
}

public MemCombineFunc getMemCombineFunc() {
return memCombineFunc;
}

public void setMemCombineFunc(MemCombineFunc memCombineFunc) {
this.memCombineFunc = memCombineFunc;
}

public List<Spill> getSpills() {
return spills;
}

public void setSpills(List<Spill> spills) {
this.spills = spills;
}

public DiskCombineFunc getDiskCombineFunc() {
return diskCombineFunc;
}

public void setDiskCombineFunc(DiskCombineFunc diskCombineFunc) {
this.diskCombineFunc = diskCombineFunc;
}

public List<Segment> getMapOutputs() {
return mapOutputs;
}

public void setMapOutputs(List<Segment> mapOutputs) {
this.mapOutputs = mapOutputs;
}

public List<Merge> getMerges() {
return merges;
}

public void setCounter(String name, long value) {
switch (name) {
case "FILE_BYTES_READ":
file_bytes_read = value;
break;

case "HDFS_BYTES_READ":
mapFunc.setMapInputBytes(value);
break;

case "FILE_BYTES_WRITTEN":
file_bytes_written = value;
break;

case "Map input records":
mapFunc.setMapInputRecords(value);
break;

case "Map output records":
mapFunc.setMapOutputRecords(value);
break;

case "Physical memory (bytes) snapshot":
physical_memory_bytes = value;
break;

case "Total committed heap usage (bytes)":
total_committed_bytes = value;
break;
}

}

}
10 changes: 10 additions & 0 deletions src/dataflow/model/mapper/MemCombineFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dataflow.model.mapper;

public class MemCombineFunc {

private long combineInputRecords;
private long combineInputBytes;

private long combineOutputRecords;
private long combineOutputBytes;
}
35 changes: 35 additions & 0 deletions src/dataflow/model/mapper/Merge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package dataflow.model.mapper;

public class Merge {
private int partitionId;
private int segmentsNum;

private long recordsBeforeMerge;
private long rawLengthBeforeMerge;
private long compressedLengthBeforeMerge;

private long recordsAfterMerge;
private long rawLengthAfterMerge;
private long compressedLengthAfterMerge;


public Merge(int partitionId, int segmentsNum,
long rawLengthBeforeMerge, long compressedLengthBeforeMerge) {

this.partitionId = partitionId;
this.segmentsNum = segmentsNum;
this.rawLengthBeforeMerge = rawLengthBeforeMerge;
this.compressedLengthBeforeMerge = compressedLengthBeforeMerge;
}

public void setAfterMerge(
long recordsBeforeMerge, long recordsAfterMerge,
long rawLengthEnd, long compressedLengthEnd) {

this.recordsBeforeMerge = recordsBeforeMerge;
this.recordsAfterMerge = recordsAfterMerge;
this.rawLengthAfterMerge = rawLengthEnd;
this.compressedLengthAfterMerge = compressedLengthEnd;

}
}
7 changes: 7 additions & 0 deletions src/dataflow/model/mapper/Segment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dataflow.model.mapper;

public class Segment {

long records;
long bytes;
}
27 changes: 27 additions & 0 deletions src/dataflow/model/mapper/Spill.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dataflow.model.mapper;

public class Spill {


private boolean hasCombine;
private String reason; // record, buffer, flush

private long recordsBeforeCombine;
private long bytesBeforeSpill;
private long recordsAfterCombine;
private long rawLength;
private long compressedLength;

public Spill(boolean hasCombine, String reason, long recordsBeforeCombine,
long bytesBeforeSpill, long recordsAfterCombine, long rawLength,
long compressedLength) {
this.hasCombine = hasCombine;
this.reason = reason;
this.recordsBeforeCombine = recordsBeforeCombine;
this.bytesBeforeSpill = bytesBeforeSpill;
this.recordsAfterCombine = recordsAfterCombine;
this.rawLength = rawLength;
this.compressedLength = compressedLength;
}

}
53 changes: 53 additions & 0 deletions src/dataflow/model/mapper/SpillBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package dataflow.model.mapper;

public class SpillBuffer {

private long kvbuffer;
private long kvindices;
private long kvoffsets;

private int io_sort_mb;
// dataBuffer
private long softBufferLimit;
private long kvbufferBytes;

// recordBuffer
private long softRecordLimit;
private long kvoffsetsLen;

// current size of records in the buffer
private long cachedMapOutputRecrods;
private long cachedMapOutputBytes;

// set buffer infos
public void setDataBuffer(long softBufferLimit, long kvbufferBytes) {
this.softBufferLimit = softBufferLimit;
this.kvbufferBytes = kvbufferBytes;
}

public void setRecordBuffer(long softRecordLimit, long kvoffsetsLen) {
this.softRecordLimit = softRecordLimit;
this.kvoffsetsLen = kvoffsetsLen;

}

public long getSoftBufferLimit() {
return softBufferLimit;
}

public long getKvbufferBytes() {
return kvbufferBytes;
}

public long getSoftRecordLimit() {
return softRecordLimit;
}

public long getKvoffsetsLen() {
return kvoffsetsLen;
}

public void setIoSortMB(int io_sort_mb) {
this.io_sort_mb = io_sort_mb;
}
}
10 changes: 10 additions & 0 deletions src/dataflow/model/reducer/MergeCombineFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dataflow.model.reducer;

public class MergeCombineFunc {
private long combineInputRecords;
private long combineInputBytes;

private long combineOutputRecords;
private long combineOutputBytes;

}
6 changes: 6 additions & 0 deletions src/dataflow/model/reducer/OnDiskSeg.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dataflow.model.reducer;

public class OnDiskSeg {
long records;
long bytes;
}
Loading

0 comments on commit 5afacdc

Please sign in to comment.