Skip to content

Commit

Permalink
[FLINK-3949] [metrics] Add numSplitsProcessed counter metric.
Browse files Browse the repository at this point in the history
This closes apache#2119
  • Loading branch information
zentol authored and fhueske committed Jun 18, 2016
1 parent 18744b2 commit 5a0c268
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void invoke() throws Exception {
LOG.debug(getLogString("Starting data source operator"));

RuntimeContext ctx = createRuntimeContext();
Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");

if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
Expand Down Expand Up @@ -172,7 +172,7 @@ public void invoke() throws Exception {
// close. We close here such that a regular close throwing an exception marks a task as failed.
format.close();
}
splitCounter.inc();
completedSplitsCounter.inc();
} // end for all input splits

// close the collector. if it is a chaining task collector, it will close its chained tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -235,6 +236,7 @@ public boolean isRunning() {
public void run() {
try {

Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
this.format.openInputFormat();

while (this.isRunning) {
Expand Down Expand Up @@ -290,6 +292,7 @@ public void run() {
}
}
}
completedSplitsCounter.inc();

} finally {
// close and prepare for the next iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

Expand Down Expand Up @@ -70,6 +71,7 @@ public void open(Configuration parameters) throws Exception {
public void run(SourceContext<OUT> ctx) throws Exception {
try {

Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}
Expand All @@ -86,6 +88,7 @@ public void run(SourceContext<OUT> ctx) throws Exception {
ctx.collect(nextElement);
}
format.close();
completedSplitsCounter.inc();

if (isRunning) {
isRunning = splitIterator.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand Down Expand Up @@ -252,6 +254,11 @@ private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits) {
this.format = format;
}

@Override
public MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}

@Override
public InputSplitProvider getInputSplitProvider() {
try {
Expand Down

0 comments on commit 5a0c268

Please sign in to comment.