Skip to content

Commit

Permalink
[FLINK-13764][task,metrics] Pass the counter of numRecordsIn into the…
Browse files Browse the repository at this point in the history
… constructor of StreamInputProcessor

Currently the counter of numRecordsIn is setup while processing input in processor. In order to integrate the processing logic based on
 StreamTaskInput#emitNext(Output) later, we need to pass the counter into output functions then. So there are three reasons to do this:

It is the precondition of following integration work.
We could make the counter as final fields in StreamOneInputProcessor and StreamTwoInputSelectableProcessor.
We could reuse the counter setup logic for all the input processors.

There should be no side effects if we make the counter setup a bit earlier than the previous way.
  • Loading branch information
zhijiangW committed Aug 23, 2019
1 parent aa9ac7e commit a7f1fee
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -85,7 +83,7 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
// ---------------- Metrics ------------------

private final WatermarkGauge watermarkGauge;
private Counter numRecordsIn;
private final Counter numRecordsIn;

@SuppressWarnings("unchecked")
public StreamOneInputProcessor(
Expand All @@ -101,7 +99,8 @@ public StreamOneInputProcessor(
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge,
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {
OperatorChain<?, ?> operatorChain,
Counter numRecordsIn) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

Expand All @@ -127,6 +126,7 @@ public StreamOneInputProcessor(
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.operatorChain = checkNotNull(operatorChain);
this.numRecordsIn = checkNotNull(numRecordsIn);
}

@Override
Expand All @@ -141,8 +141,6 @@ public CompletableFuture<?> isAvailable() {

@Override
public boolean processInput() throws Exception {
initializeNumRecordsIn();

StreamElement recordOrMark = input.pollNextNullable();
if (recordOrMark != null) {
int channel = input.getLastChannel();
Expand Down Expand Up @@ -189,17 +187,6 @@ private void checkFinished() throws Exception {
}
}

private void initializeNumRecordsIn() {
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
}

@Override
public void close() throws IOException {
input.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
Expand Down Expand Up @@ -94,7 +92,7 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream

private InputSelection inputSelection;

private Counter numRecordsIn;
private final Counter numRecordsIn;

private boolean isPrepared;

Expand All @@ -113,7 +111,8 @@ public StreamTwoInputSelectableProcessor(
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {
OperatorChain<?, ?> operatorChain,
Counter numRecordsIn) throws IOException {

checkState(streamOperator instanceof InputSelectable);

Expand Down Expand Up @@ -146,6 +145,7 @@ public StreamTwoInputSelectableProcessor(
new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1));

this.operatorChain = checkNotNull(operatorChain);
this.numRecordsIn = checkNotNull(numRecordsIn);

this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;
Expand Down Expand Up @@ -329,14 +329,6 @@ private void prepareForProcessing() {
// method take effect.
inputSelection = inputSelector.nextSelection();

try {
numRecordsIn = ((OperatorMetricGroup) streamOperator
.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}

isPrepared = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void init() throws Exception {
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge,
getTaskNameWithSubtaskAndId(),
operatorChain);
operatorChain,
setupNumRecordsInCounter(headOperator));
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -36,6 +38,7 @@
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
Expand Down Expand Up @@ -319,6 +322,15 @@ public StreamTaskStateInitializer createStreamTaskStateInitializer() {
timerService);
}

protected Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
try {
return ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
return new SimpleCounter();
}
}

@VisibleForTesting
SynchronousSavepointLatch getSynchronousSavepointLatch() {
return syncSavepointLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected void createInputProcessor(
input1WatermarkGauge,
input2WatermarkGauge,
getTaskNameWithSubtaskAndId(),
operatorChain);
operatorChain,
setupNumRecordsInCounter(headOperator));
}
}

0 comments on commit a7f1fee

Please sign in to comment.