Skip to content

Commit

Permalink
Dataflow model nearly finishes
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Apr 2, 2014
1 parent 1543176 commit dac635c
Show file tree
Hide file tree
Showing 16 changed files with 592 additions and 18 deletions.
23 changes: 23 additions & 0 deletions src/dataflow/model/job/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dataflow.model.job;

import java.util.List;

import dataflow.model.mapper.Mapper;
import dataflow.model.reducer.Reducer;

public class Job {

private Mapper mappers;
private Reducer reducers;

public void setMappsers(List<Mapper> mappers) {
// TODO Auto-generated method stub

}

public void setReducers(List<Reducer> reducers2) {
// TODO Auto-generated method stub

}

}
42 changes: 42 additions & 0 deletions src/dataflow/model/job/JobDataflowModelBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dataflow.model.job;

import java.util.List;

import dataflow.model.mapper.Mapper;
import dataflow.model.reducer.Reducer;
import profile.job.JobProfile;
import profile.profiler.JobProfileFromSerialization;

public class JobDataflowModelBuilder {


public static void main(String[] args) {
String jobId = "job_201403261726_0001";

String serializeDir = "/Users/xulijie/Documents/DiagOOMSpace/PigMapJoin/";

JobProfile jobProfile = JobProfileFromSerialization.deserialization(serializeDir, jobId);

Job job = buildDataflow(jobProfile);

/*
System.out.println("## Mapper");
System.out.println(job.getMapperInfoList().get(0));
System.out.println("\n## Reducer");
System.out.println(job.getReducerInfoList().get(0));
*/
}

public static Job buildDataflow(JobProfile jobProfile) {
List<Mapper> mappers = MapperDataflowBuilder.build(jobProfile);
List<Reducer> reducers = ReducerDataflowBuilder.build(jobProfile, mappers);

Job job = new Job();
job.setMappsers(mappers);
job.setReducers(reducers);

return job;
}

}
32 changes: 32 additions & 0 deletions src/dataflow/model/job/MapperDataflowBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dataflow.model.job;

import java.util.ArrayList;
import java.util.List;

import dataflow.model.mapper.Mapper;
import profile.job.JobProfile;
import profile.mapper.MapperInfo;

public class MapperDataflowBuilder {

public static List<Mapper> build(JobProfile jobProfile) {

List<Mapper> mappers = new ArrayList<Mapper>();

for(MapperInfo info : jobProfile.getMapperInfoList()) {
Mapper mapper = new Mapper(jobProfile.getJobConfiguration());

mapper.setBasicInfo(info.getTaskId(), info.isMapRunning(), info.getRunningPhase());
mapper.setInputSplit(info.getInput());
mapper.setSpillBuffer(info.getBuffer());
mapper.setSpills(info.getSpill(), info.getCounters());
mapper.setMerges(info.getMerge(), info.getCounters());

mappers.add(mapper);
}

return mappers;
}

}

33 changes: 33 additions & 0 deletions src/dataflow/model/job/ReducerDataflowBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dataflow.model.job;

import java.util.ArrayList;
import java.util.List;

import profile.job.JobProfile;
import profile.reducer.ReducerInfo;
import dataflow.model.mapper.Mapper;
import dataflow.model.reducer.Reducer;


public class ReducerDataflowBuilder {

public static List<Reducer> build(JobProfile jobProfile, List<Mapper> mappers) {
List<Reducer> reducers = new ArrayList<Reducer>();

for(ReducerInfo info : jobProfile.getReducerInfoList()) {

Reducer reducer = new Reducer(jobProfile.getJobConfiguration());

reducer.setBasicInfo(info.getTaskId(), info.getRunningPhase(), info.isInMemMergeRunning());
reducer.setShuffle(mappers, info.getShuffle(), info.getShuffleBuffer(), info.getMergeInShuffle());
reducer.setMergeCombineFunc(info.getMergeInShuffle(), info.getReducerCounters());
reducer.setSegmentInRedeuceBuffer(info.getSort());
reducer.setReduce(info.getReducerCounters());

reducers.add(reducer);
}

return reducers;
}

}
31 changes: 26 additions & 5 deletions src/dataflow/model/mapper/Mapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ public class Mapper {
private boolean isMapRunning;
private String runningPhase;
private Configuration conf;
private int id;

private InputSplit split;
private MapFunc mapFunc;
private SpillBuffer spillBuffer;
private MemCombineFunc memCombineFunc;
private List<SpillPiece> spills;
private DiskCombineFunc diskCombineFunc;
private List<Segment> mapOutputs;
private List<Segment> mapOutputSegs;
private List<MergeAction> merges;

private long file_bytes_read;
Expand All @@ -46,14 +47,16 @@ public Mapper(Configuration conf) {
memCombineFunc = new MemCombineFunc();
spills = new ArrayList<SpillPiece>();

mapOutputs = new ArrayList<Segment>();
mapOutputSegs = new ArrayList<Segment>();
merges = new ArrayList<MergeAction>();
}

public void setBasicInfo(String taskId, boolean isMapRunning, String runningPhase) {
this.taskId = taskId;
this.isMapRunning = isMapRunning;
this.runningPhase = runningPhase;

id = Integer.parseInt(taskId.substring(taskId.indexOf("_m_") + 3, taskId.lastIndexOf('_')));
}

public void setInputSplit(Input input) {
Expand Down Expand Up @@ -135,7 +138,10 @@ public void setMerges(Merge merge, MapperCounters counters) {
diskCombineFunc.setcCombineOutputRecords(counters.getCombine_output_records()
- combineOutputRecsInSpills - outputRecsInPreviousMerges);
}


for(MergeAction action : merges) {
mapOutputSegs.add(new Segment(id, action.getPartitionId(), action.getRecordsAfter(), action.getBytesAfter()));
}

}

Expand Down Expand Up @@ -180,12 +186,27 @@ public DiskCombineFunc getDiskCombineFunc() {
return diskCombineFunc;
}

public List<Segment> getMapOutputs() {
return mapOutputs;
public List<Segment> getMapOutputSegs() {
return mapOutputSegs;
}

public List<MergeAction> getMerges() {
return merges;
}

public boolean isMapRunning() {
return isMapRunning;
}

public String getRunningPhase() {
return runningPhase;
}

public long getTotal_committed_bytes() {
return total_committed_bytes;
}




}
26 changes: 25 additions & 1 deletion src/dataflow/model/mapper/MergeAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,31 @@ public MergeAction(MergeInfo mergeInfo) {
this.bytesAfter = mergeInfo.getRawLengthAfterMerge();
}


public int getPartitionId() {
return partitionId;
}

public int getSegmentsNum() {
return segmentsNum;
}

public long getRecordsBefore() {
return recordsBefore;
}

public long getBytesBefore() {
return bytesBefore;
}

public long getRecordsAfter() {
return recordsAfter;
}

public long getBytesAfter() {
return bytesAfter;
}




}
28 changes: 28 additions & 0 deletions src/dataflow/model/mapper/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,34 @@

public class Segment {

int partitionId;
int taskId;

long records;
long bytes;

public Segment(int taskId, int partitionId, long records, long bytes) {
this.taskId = taskId;
this.partitionId = partitionId;
this.records = records;
this.bytes = bytes;
}

public long getRecords() {
return records;
}

public long getBytes() {
return bytes;
}

public int getPartitionId() {
return partitionId;
}

public int getTaskId() {
return taskId;
}


}
33 changes: 29 additions & 4 deletions src/dataflow/model/reducer/MergeCombineFunc.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,35 @@
package dataflow.model.reducer;

public class MergeCombineFunc {
private long combineInputRecords;
private long combineInputBytes;

private long combineOutputRecords;
private long combineOutputBytes;
private long cCombineInputRecords;

private long cCombineOutputRecords;

private long tCombineInputRecords;

public long getcCombineInputRecords() {
return cCombineInputRecords;
}

public void setcCombineInputRecords(long cCombineInputRecords) {
this.cCombineInputRecords = cCombineInputRecords;
}

public long getcCombineOutputRecords() {
return cCombineOutputRecords;
}

public void setcCombineOutputRecords(long cCombineOutputRecords) {
this.cCombineOutputRecords = cCombineOutputRecords;
}

public long gettCombineInputRecords() {
return tCombineInputRecords;
}

public void settCombineInputRecords(long tCombineInputRecords) {
this.tCombineInputRecords = tCombineInputRecords;
}

}
41 changes: 37 additions & 4 deletions src/dataflow/model/reducer/ReduceFunc.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,43 @@
package dataflow.model.reducer;

public class ReduceFunc {
private long reduceInputRecords;
private long reduceInputBytes;
private long tReduceInputRecords;

private long reduceOutputRecords;
private long reduceOutputBytes;
private long cReduceInputRecords;
private long cReduceInputGroups;

private long cReduceOutputRecords;

public long gettReduceInputRecords() {
return tReduceInputRecords;
}

public void settReduceInputRecords(long tReduceInputRecords) {
this.tReduceInputRecords = tReduceInputRecords;
}

public long getcReduceInputRecords() {
return cReduceInputRecords;
}

public void setcReduceInputRecords(long cReduceInputRecords) {
this.cReduceInputRecords = cReduceInputRecords;
}

public long getcReduceInputGroups() {
return cReduceInputGroups;
}

public void setcReduceInputGroups(long cReduceInputGroups) {
this.cReduceInputGroups = cReduceInputGroups;
}

public long getcReduceOutputRecords() {
return cReduceOutputRecords;
}

public void setcReduceOutputRecords(long cReduceOutputRecords) {
this.cReduceOutputRecords = cReduceOutputRecords;
}

}
Loading

0 comments on commit dac635c

Please sign in to comment.