Skip to content

Commit

Permalink
add step stats and collecting logics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dennis Kang committed Mar 15, 2015
1 parent e01ef5c commit 9af56c6
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class AbstractStep implements Step
protected DataDef dataDef;
private HashMap<String, Variable> variables;
private Job job;
private StepStats stats;


public Job getJob()
Expand Down Expand Up @@ -123,6 +124,7 @@ public AbstractStep(String name, Job job)
this.running = new AtomicBoolean();
this.stopped = new AtomicBoolean();
this.paused = new AtomicBoolean();
this.stats = StepStatsManager.getInstance().createStepStats(this);
}

public String getName() {
Expand Down Expand Up @@ -237,6 +239,7 @@ public DataRow getRow() throws ETLException
if (row != null)
{
dataRow.setDataDef(ds.getDataDef());
this.stats.addLinesRead();
break;
}

Expand All @@ -250,6 +253,7 @@ public DataRow getRow() throws ETLException
else
{
dataRow.setDataDef(ds.getDataDef());
this.stats.addLinesRead();
break;
}
}
Expand Down Expand Up @@ -327,6 +331,7 @@ public DataRow getRowFrom(DataSet dataSet) throws ETLException

dataRow.setRow(row);
dataRow.setDataDef(dataSet.getDataDef());
this.getStats().addLinesRead();

return dataRow;
}
Expand Down Expand Up @@ -372,6 +377,7 @@ public void putRow(DataDef dataDef, Object[] row) throws ETLException
for (DataSet ds : this.outputDataSets)
{
while (!ds.putRow(dataDef, row) && !isStopped()) ;
stats.addLinesWritten();
}
}

Expand Down Expand Up @@ -446,6 +452,7 @@ public boolean filterRow() throws ETLException
{
if (!((Boolean) value).booleanValue())
{
this.stats.addLinesFiltered();
filterLogger.debug("{}",
this.printRow(this.getContext().getCurrentInputRow(), ";", "\""));
return true;
Expand All @@ -456,4 +463,9 @@ public boolean filterRow() throws ETLException
else
throw new ETLException("Filter formula returns a non boolean value: " + value.toString());
}

public StepStats getStats()
{
return stats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void process()
step.dispose();
}

logger.info(StepStatsManager.getInstance().toString());
logger.info("Job is done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface Step
public void stopAll();
public boolean isInitialized();
public void setInitialized(boolean initialized);
public StepStats getStats();

public void setOutputDone();
}
118 changes: 118 additions & 0 deletions smartETL/src/main/java/org/f3tools/incredible/smartETL/StepStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.f3tools.incredible.smartETL;

/**
* Capture all stats of each step
* @author Dennis Kang
* @since 2015/03/15
*
*/
public class StepStats
{
// number of lines read from previous steps
private long linesRead;
// number of lines written to next step(s)
private long linesWritten;
// number of lines read from file or database
private long linesInput;
// number of lines written to file or database
private long linesOutput;
// number of updates in a database table or file
private long linesUpdated;
// number of lines skipped
private long linesFiltered;
// number of lines errored
private long linesErrored;

private Step step;

public StepStats(Step step)
{
this.step = step;
}

public long getLinesRead()
{
return linesRead;
}

public void addLinesRead()
{
linesRead++;
}

public long getLinesWritten()
{
return linesWritten;
}

public void addLinesWritten()
{
linesWritten++;
}

public long getLinesInput()
{
return linesInput;
}

public void addLinesInput()
{
linesInput++;
}

public long getLinesOutput()
{
return linesOutput;
}

public void addLinesOutput()
{
linesOutput++;
}

public long getLinesUpdated()
{
return linesUpdated;
}

public void addLinesUpdated()
{
linesUpdated++;
}

public long getLinesFiltered()
{
return linesFiltered;
}

public void addLinesFiltered()
{
linesFiltered++;
}

public long getLinesErrored()
{
return linesErrored;
}

public void addLinesErrored()
{
linesErrored++;
}

public String toString()
{
StringBuffer sb = new StringBuffer();

sb.append("Step statistics: " + step.getName() + "\n");
sb.append("Lines read: " + linesRead + "\n");
sb.append("Lines written: " + linesWritten + "\n");
sb.append("Lines inputed: " + linesInput + "\n");
sb.append("Lines outputed: " + linesOutput + "\n");
sb.append("Lines updated: " + linesUpdated + "\n");
sb.append("Lines filtered: " + linesFiltered + "\n");
sb.append("Lines with errs: " + linesErrored + "\n");

return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.f3tools.incredible.smartETL;

import java.util.HashMap;
import java.util.Map;

public class StepStatsManager
{
private static StepStatsManager instance;
private Map<String, StepStats> stepStatsMap;

private StepStatsManager()
{
stepStatsMap = new HashMap<String, StepStats>();
}

public static synchronized StepStatsManager getInstance()
{
if (instance == null) instance = new StepStatsManager();
return instance;
}

public synchronized StepStats createStepStats(Step step)
{
StepStats stepStats = new StepStats(step);
stepStatsMap.put(step.getName(), stepStats);
return stepStats;
}

public String toString()
{
StringBuffer sb = new StringBuffer();

for(StepStats stats : stepStatsMap.values())
{
sb.append(stats.toString());
sb.append("\n");
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public boolean processRow() throws ETLException
DataRow r = new DataRow();
r.setDataDef(dataDef);
r.setRow(row);
getStats().addLinesInput();

this.setCurrentInputRow(r);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public boolean processRow() throws ETLException

bw.write(this.printRow(r, this.csvOutputDef.getDelimiter(), this.csvOutputDef.getQuote()));
bw.write("\n");
this.getStats().addLinesOutput();
}
catch (IOException e)
{
Expand Down

0 comments on commit 9af56c6

Please sign in to comment.