Skip to content

Commit

Permalink
[FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWrit…
Browse files Browse the repository at this point in the history
…erMetricGroup directly.
  • Loading branch information
JingGe authored and AHeise committed Mar 18, 2022
1 parent 3ca3824 commit 4e0c24a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final SinkWriterMetricGroup metrics;

/* Counter for number of bytes this sink has attempted to send to the destination. */
private final Counter numBytesOutCounter;
private final Counter numBytesSendCounter;

/* Counter for number of records this sink has attempted to send to the destination. */
private final Counter numRecordsOutCounter;
private final Counter numRecordsSendCounter;

/**
* Rate limiting strategy {@code inflightMessages} at any given time, {@code
Expand Down Expand Up @@ -295,8 +295,8 @@ public AsyncSinkWriter(

this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();

this.fatalExceptionCons =
exception ->
Expand Down Expand Up @@ -417,8 +417,8 @@ private List<RequestEntryT> createNextAvailableBatch() {
batchSizeBytes += requestEntrySize;
}

numRecordsOutCounter.inc(batch.size());
numBytesOutCounter.inc(batchSizeBytes);
numRecordsSendCounter.inc(batch.size());
numBytesSendCounter.inc(batchSizeBytes);

return batch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public Optional<Gauge<Long>> getCurrentSendTimeGauge() {
}

public Counter getNumRecordsOutCounter() {
return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
return metricGroup.getNumRecordsSendCounter();
}

public Counter getNumBytesOutCounter() {
return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
return metricGroup.getNumBytesSendCounter();
}
}

0 comments on commit 4e0c24a

Please sign in to comment.