Skip to content

Commit

Permalink
[FLINK-30536][runtime] Remove CountingOutput from per-record code pat…
Browse files Browse the repository at this point in the history
…h for most operators

This closes apache#21579.
  • Loading branch information
lindong28 authored Jan 9, 2023
1 parent 8aa446a commit c45c4a0
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,13 @@ public void setup(
final Environment environment = containingTask.getEnvironment();
this.container = containingTask;
this.config = config;
try {
InternalOperatorMetricGroup operatorMetricGroup =
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
this.output = registerCounterOnOutput(output, operatorMetricGroup);
if (config.isChainEnd()) {
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
this.metrics = operatorMetricGroup;
} catch (Exception e) {
LOG.warn("An error occurred while instantiating task metrics.", e);
this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.output = output;
}

this.output = output;
this.metrics =
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2);

try {
Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
Expand Down Expand Up @@ -646,10 +636,4 @@ public OperatorID getOperatorID() {
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}

protected Output<StreamRecord<OUT>> registerCounterOnOutput(
Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) {
return new CountingOutput<>(
output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,50 +104,26 @@ public abstract class AbstractStreamOperatorV2<OUT>
public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
final Environment environment = parameters.getContainingTask().getEnvironment();
config = parameters.getStreamConfig();
CountingOutput<OUT> countingOutput;
InternalOperatorMetricGroup operatorMetricGroup;
try {
operatorMetricGroup =
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
countingOutput =
new CountingOutput(
parameters.getOutput(),
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainEnd()) {
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
} catch (Exception e) {
LOG.warn("An error occurred while instantiating task metrics.", e);
countingOutput = null;
operatorMetricGroup = null;
}

if (countingOutput == null || operatorMetricGroup == null) {
metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
output = parameters.getOutput();
} else {
metrics = operatorMetricGroup;
output = countingOutput;
}

output = parameters.getOutput();
metrics =
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
latencyStats =
createLatencyStats(
environment.getTaskManagerInfo().getConfiguration(),
parameters.getContainingTask().getIndexInSubtaskGroup());

processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService());
executionConfig = parameters.getContainingTask().getExecutionConfig();
userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader();
cancelables = parameters.getContainingTask().getCancelables();
this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs);
combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs);

runtimeContext =
new StreamingRuntimeContext(
environment,
environment.getAccumulatorRegistry().getUserMap(),
operatorMetricGroup,
metrics,
getOperatorID(),
processingTimeService,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

/** Wrapping {@link Output} that updates metrics on the number of emitted elements. */
public class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
public class CountingOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> output;
private final Counter numRecordsOut;

public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
public CountingOutput(
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output, Counter numRecordsOut) {
this.output = output;
this.numRecordsOut = counter;
this.numRecordsOut = numRecordsOut;
}

@Override
Expand Down Expand Up @@ -66,4 +69,9 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
public void close() {
output.close();
}

@Override
public Gauge<Long> getWatermarkGauge() {
return output.getWatermarkGauge();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
Expand All @@ -42,7 +41,6 @@
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -218,22 +216,6 @@ public void close() throws Exception {
closeAll(sinkWriter, super::close);
}

/**
* Skip registering numRecordsOut counter on output.
*
* <p>Metric "numRecordsOut" is defined as the total number of records written to the external
* system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of
* records sent to downstream operators, which is number of Committable batches sent to
* SinkCommitter. So we skip registering this metric on output and leave this metric to sink
* writer implementations to report.
*/
@Override
protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput(
Output<StreamRecord<CommittableMessage<CommT>>> output,
OperatorMetricGroup operatorMetricGroup) {
return output;
}

private void emit(
int indexOfThisSubtask,
int numberOfParallelSubtasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
Expand All @@ -40,33 +38,26 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>>
private static final Logger LOG = LoggerFactory.getLogger(ChainingOutput.class);

protected final Input<T> input;
protected final Counter numRecordsOut;
protected final Counter numRecordsIn;
protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
@Nullable protected final OutputTag<T> outputTag;
protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;

public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable OutputTag<T> outputTag) {
this(operator, operator.getMetricGroup(), outputTag);
}

public ChainingOutput(
Input<T> input,
OperatorMetricGroup operatorMetricGroup,
@Nullable Counter prevNumRecordsOut,
OperatorMetricGroup curOperatorMetricGroup,
@Nullable OutputTag<T> outputTag) {
this.input = input;

{
Counter tmpNumRecordsIn;
try {
OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsIn = new SimpleCounter();
}
numRecordsIn = tmpNumRecordsIn;
if (prevNumRecordsOut != null) {
this.numRecordsOut = prevNumRecordsOut;
} else {
// Uses a dummy counter here to avoid checking the existence of numRecordsOut on the
// per-record path.
this.numRecordsOut = new SimpleCounter();
}

this.numRecordsIn = curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
this.outputTag = outputTag;
}

Expand Down Expand Up @@ -94,6 +85,7 @@ protected <X> void pushToOperator(StreamRecord<X> record) {
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;

numRecordsOut.inc();
numRecordsIn.inc();
input.setKeyContextElement(castRecord);
input.processElement(castRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;

Expand All @@ -30,20 +30,13 @@ final class CopyingChainingOutput<T> extends ChainingOutput<T> {

private final TypeSerializer<T> serializer;

public CopyingChainingOutput(
OneInputStreamOperator<T, ?> operator,
TypeSerializer<T> serializer,
@Nullable OutputTag<T> outputTag) {
super(operator, outputTag);
this.serializer = serializer;
}

public CopyingChainingOutput(
Input<T> input,
TypeSerializer<T> serializer,
OperatorMetricGroup operatorMetricGroup,
@Nullable Counter prevRecordsOutCounter,
OperatorMetricGroup curOperatorMetricGroup,
@Nullable OutputTag<T> outputTag) {
super(input, operatorMetricGroup, outputTag);
super(input, prevRecordsOutCounter, curOperatorMetricGroup, outputTag);
this.serializer = serializer;
}

Expand Down Expand Up @@ -76,6 +69,7 @@ protected <X> void pushToOperator(StreamRecord<X> record) {
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;

numRecordsOut.inc();
numRecordsIn.inc();
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
input.setKeyContextElement(copy);
Expand Down
Loading

0 comments on commit c45c4a0

Please sign in to comment.